diff --git a/app/app.go b/app/app.go index 144989f..d2fb3e3 100644 --- a/app/app.go +++ b/app/app.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "os/signal" + "sync" "syscall" log "github.com/Sirupsen/logrus" @@ -18,24 +19,38 @@ import ( ) var ( - config *conf.Config chain *blockchain.BlockChain logFile = os.Stdout // A reference to the transaction pool tpool *pool.Pool ) +// App contains information about a running instance of a Cumulus node +type App struct { + PeerStore *peer.PeerStore +} + // Run sets up and starts a new Cumulus node with the -// given configuration. +// given configuration. This should only be called once (except in tests) func Run(cfg conf.Config) { log.Info("Starting Cumulus node") - config = &cfg + config := &cfg + + addr := fmt.Sprintf("%s:%d", config.Interface, config.Port) + a := App{ + PeerStore: peer.NewPeerStore(addr), + } // Set logging level if cfg.Verbose { log.SetLevel(log.DebugLevel) } + // We'll need to wait on at least 2 goroutines (Listen and + // MaintainConnections) to start before returning + wg := &sync.WaitGroup{} + wg.Add(2) + // Start a goroutine that waits for program termination. Before the program // exits it will flush logs and save the blockchain. c := make(chan os.Signal, 1) @@ -57,42 +72,43 @@ func Run(cfg conf.Config) { // Set Peer default Push and Request handlers. These functions will handle // request and push messages from all peers we connect to unless overridden // for specific peers by calls like p.SetRequestHandler(someHandler) - peer.SetDefaultPushHandler(PushHandler) - peer.SetDefaultRequestHandler(RequestHandler) + a.PeerStore.SetDefaultPushHandler(a.PushHandler) + a.PeerStore.SetDefaultRequestHandler(a.RequestHandler) // Start listening on the given interface and port so we can receive // conenctions from other peers log.Infof("Starting listener on %s:%d", cfg.Interface, cfg.Port) - peer.ListenAddr = fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port) + a.PeerStore.ListenAddr = addr go func() { - address := fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port) - err := conn.Listen(address, peer.ConnectionHandler) + err := conn.Listen(addr, a.PeerStore.ConnectionHandler, wg) if err != nil { - log.WithError( - err, - ).Fatalf("Failed to listen on %s:%d", cfg.Interface, cfg.Port) + log.WithError(err).Fatalf("Failed to listen on %s", addr) } }() // If the console flag was passed, redirect logs to a file and run the console + // NOTE: if the log file already exists we will exit with a fatal error here! + // This should stop people from running multiple Cumulus nodes that will try + // to log to the same file. if cfg.Console { - logFile, err := os.OpenFile("logfile", os.O_WRONLY|os.O_CREATE, 0755) + logFile, err := os.OpenFile("logfile", os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0755) if err != nil { - log.WithError(err).Fatal("Failed to redirect logs to log file") + log.WithError(err).Fatal("Failed to redirect logs to file") } - log.Warn("Redirecting logs to logfile") + log.Warn("Redirecting logs to file") log.SetOutput(logFile) - go RunConsole() + wg.Add(1) + go RunConsole(a.PeerStore, wg) } if len(config.Target) > 0 { // Connect to the target and discover its peers. - ConnectAndDiscover(cfg.Target) + a.ConnectAndDiscover(cfg.Target) } // Try maintain as close to peer.MaxPeers connections as possible while this // peer is running - go peer.MaintainConnections() + go a.PeerStore.MaintainConnections(wg) // Request the blockchain. if chain == nil { @@ -100,30 +116,28 @@ func Run(cfg conf.Config) { initializeChain() } - // Return to command line. - select {} // Hang main thread. Everything happens in goroutines from here + // Wait for goroutines to start + wg.Wait() } // ConnectAndDiscover tries to connect to a target and discover its peers. -func ConnectAndDiscover(target string) { +func (a App) ConnectAndDiscover(target string) { peerInfoRequest := msg.Request{ ID: uuid.New().String(), ResourceType: msg.ResourcePeerInfo, } log.Infof("Dialing target %s", target) - c, err := conn.Dial(target) + p, err := peer.Connect(target, a.PeerStore) if err != nil { - log.WithError(err).Fatalf("Failed to connect to target") + log.WithError(err).Fatal("Failed to dial target") } - peer.ConnectionHandler(c) - p := peer.PStore.Get(c.RemoteAddr().String()) - p.Request(peerInfoRequest, peer.PeerInfoHandler) + p.Request(peerInfoRequest, a.PeerStore.PeerInfoHandler) } -// RequestHandler is called every time a peer sends us a request message except -// on peers whos RequestHandlers have been overridden. -func RequestHandler(req *msg.Request) msg.Response { +// RequestHandler is called every time a peer sends us a request message expect +// on peers whos PushHandlers have been overridden. +func (a App) RequestHandler(req *msg.Request) msg.Response { res := msg.Response{ID: req.ID} // Build some error types. @@ -134,7 +148,7 @@ func RequestHandler(req *msg.Request) msg.Response { switch req.ResourceType { case msg.ResourcePeerInfo: - res.Resource = peer.PStore.Addrs() + res.Resource = a.PeerStore.Addrs() case msg.ResourceBlock: // Block is requested by number. blockNumber, ok := req.Params["blockNumber"].(uint32) @@ -161,7 +175,7 @@ func RequestHandler(req *msg.Request) msg.Response { // PushHandler is called every time a peer sends us a Push message except on // peers whos PushHandlers have been overridden. -func PushHandler(push *msg.Push) { +func (a App) PushHandler(push *msg.Push) { switch push.ResourceType { case msg.ResourceBlock: blk, ok := push.Resource.(*blockchain.Block) diff --git a/app/app_test.go b/app/app_test.go index 312095a..6e1083b 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/msg" + "github.com/ubclaunchpad/cumulus/peer" ) func init() { @@ -68,7 +69,10 @@ func TestPushHandlerNewBlock(t *testing.T) { ResourceType: msg.ResourceBlock, Resource: b, } - PushHandler(&push) + a := App{ + PeerStore: peer.NewPeerStore("127.0.0.1:8000"), + } + a.PushHandler(&push) select { case work, ok := <-BlockWorkQueue: if !ok { @@ -88,7 +92,10 @@ func TestPushHandlerNewTestTransaction(t *testing.T) { ResourceType: msg.ResourceTransaction, Resource: txn, } - PushHandler(&push) + a := App{ + PeerStore: peer.NewPeerStore("127.0.0.1:8000"), + } + a.PushHandler(&push) select { case work, ok := <-TransactionWorkQueue: if !ok { @@ -103,12 +110,13 @@ func TestPushHandlerNewTestTransaction(t *testing.T) { func TestRequestHandlerNewBlockOK(t *testing.T) { initializeChain() + a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} // Set up a request (requesting block 0) blockNumber := uint32(0) req := createNewBlockRequest(blockNumber) - resp := RequestHandler(req) + resp := a.RequestHandler(req) block, ok := resp.Resource.(*blockchain.Block) // Assertion time! @@ -119,12 +127,13 @@ func TestRequestHandlerNewBlockOK(t *testing.T) { func TestRequestHandlerNewBlockBadParams(t *testing.T) { initializeChain() + a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} // Set up a request. blockNumber := "definitelynotanindex" req := createNewBlockRequest(blockNumber) - resp := RequestHandler(req) + resp := a.RequestHandler(req) block, ok := resp.Resource.(*blockchain.Block) // Make sure request failed. @@ -134,12 +143,13 @@ func TestRequestHandlerNewBlockBadParams(t *testing.T) { func TestRequestHandlerNewBlockBadType(t *testing.T) { initializeChain() + a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} // Set up a request. req := createNewBlockRequest("doesntmatter") req.ResourceType = 25 - resp := RequestHandler(req) + resp := a.RequestHandler(req) block, ok := resp.Resource.(*blockchain.Block) // Make sure request failed. @@ -149,12 +159,13 @@ func TestRequestHandlerNewBlockBadType(t *testing.T) { func TestRequestHandlerPeerInfo(t *testing.T) { initializeChain() + a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} // Set up a request. req := createNewBlockRequest("doesntmatter") req.ResourceType = msg.ResourcePeerInfo - resp := RequestHandler(req) + resp := a.RequestHandler(req) res := resp.Resource // Make sure request did not fail. diff --git a/app/console.go b/app/console.go index 8004e60..10fedef 100644 --- a/app/console.go +++ b/app/console.go @@ -2,19 +2,29 @@ package app import ( "strconv" + "sync" "github.com/abiosoft/ishell" "github.com/ubclaunchpad/cumulus/blockchain" - "github.com/ubclaunchpad/cumulus/conn" "github.com/ubclaunchpad/cumulus/peer" ) -var shell *ishell.Shell +var ( + shell *ishell.Shell + peerStore *peer.PeerStore +) // RunConsole starts the Cumulus console. This should be run only once as a // goroutine, and logging should be redirected away from stdout before it is run. -func RunConsole() { +// It takes a pointer to a PeerStore so we can use the PeerStore to interact +// with other peers and give the user info about the running instance. +func RunConsole(ps *peer.PeerStore, wg *sync.WaitGroup) { + // Wait for services to start before we star the console + wg.Wait() + + peerStore = ps shell = ishell.New() + shell.AddCmd(&ishell.Cmd{ Name: "create", Help: "create a new wallet hash or transaction", @@ -26,7 +36,7 @@ func RunConsole() { Func: check, }) shell.AddCmd(&ishell.Cmd{ - Name: "listen-address", + Name: "address", Help: "show the address this host is listening on", Func: listenAddr, }) @@ -83,11 +93,11 @@ func check(ctx *ishell.Context) { } func listenAddr(ctx *ishell.Context) { - shell.Println("Listening on", peer.ListenAddr) + shell.Println("Listening on", peerStore.ListenAddr) } func peers(tcx *ishell.Context) { - shell.Println("Connected to", peer.PStore.Addrs()) + shell.Println("Connected to", peerStore.Addrs()) } func connect(ctx *ishell.Context) { @@ -97,14 +107,9 @@ func connect(ctx *ishell.Context) { } addr := ctx.Args[0] - c, err := conn.Dial(addr) + _, err := peer.Connect(addr, peerStore) if err != nil { - shell.Println("Failed to dial peer", addr, ":", err) - return - } - peer.ConnectionHandler(c) - if peer.PStore.Get(addr) == nil { - shell.Println("Failed to extablish connection. See logs for details.") + shell.Println("Failed to extablish connection:", err) } else { shell.Println("Connected to", addr) } diff --git a/cmd/run.go b/cmd/run.go index 087cce6..e0e72ec 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -26,7 +26,12 @@ var runCmd = &cobra.Command{ Verbose: verbose, Console: console, } + + // Start the application app.Run(config) + + // Hang main thread. Everything happens in goroutines from here + select {} }, } diff --git a/conn/conn.go b/conn/conn.go index f0f182f..53967ee 100644 --- a/conn/conn.go +++ b/conn/conn.go @@ -2,6 +2,7 @@ package conn import ( "net" + "sync" ) // Dial opens a connection to a remote host. `host` should be a string @@ -12,8 +13,10 @@ func Dial(host string) (net.Conn, error) { // Listen binds to a TCP port and waits for incoming connections. // When a connection is accepted, dispatches to the handler. -func Listen(iface string, handler func(net.Conn)) error { +// Calls Done on waitgroup to signal that we are now listening. +func Listen(iface string, handler func(net.Conn), wg *sync.WaitGroup) error { listener, err := net.Listen("tcp", iface) + wg.Done() if err != nil { return err } diff --git a/conn/conn_test.go b/conn/conn_test.go index 32f30fc..26bda68 100644 --- a/conn/conn_test.go +++ b/conn/conn_test.go @@ -18,7 +18,7 @@ func TestConnect(t *testing.T) { func TestListen(t *testing.T) { wg := sync.WaitGroup{} - wg.Add(5) + wg.Add(6) handler := func(c net.Conn) { defer c.Close() @@ -33,7 +33,7 @@ func TestListen(t *testing.T) { wg.Done() } - go Listen(":8080", handler) + go Listen(":8080", handler, &wg) // Sleep to guarantee that our listener is ready when we start making connections time.Sleep(time.Millisecond) diff --git a/glide.lock b/glide.lock index 205f55e..3a534c2 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 338e5ac0b385f380b7d42f2b6d1f80449b3bf77a2c3cfee3ac34a0afac0ef973 -updated: 2017-07-09T10:45:43.507620923-07:00 +hash: 00eff548e3fa93ce1f0993837ab114a04132c04182ffeddef1e503c9e0f8026b +updated: 2017-07-11T21:51:51.426419168-07:00 imports: - name: github.com/abiosoft/ishell version: a24a06e34a7a9d8f894fb339b3e77333e4fc75ac @@ -24,6 +24,15 @@ imports: - hcl/strconv - json/scanner - json/token +- name: github.com/huin/goupnp + version: 73053506a919bb1af6beb97feb0d53a1eb814eb1 + subpackages: + - dcps/internetgateway1 + - dcps/internetgateway2 + - soap + - httpu + - scpd + - ssdp - name: github.com/inconshreveable/mousetrap version: 76626ae9c91c4f2a10f34cad8ce83ea42c93bb75 - name: github.com/magiconair/properties @@ -36,6 +45,12 @@ imports: repo: https://github.com/mattn/go-isatty - name: github.com/mitchellh/mapstructure version: d0303fe809921458f417bcf828397a65db30a7e4 +- name: github.com/onsi/ginkgo + version: 77a8c1e5c40d6bb6c5eb4dd4bdce9763564f6298 + subpackages: + - ginkgo +- name: github.com/onsi/gomega + version: 334b8f472b3af5d541c5642701c1e29e2126f486 - name: github.com/pelletier/go-toml version: 69d355db5304c0f7f809a2edc054553e7142f016 - name: github.com/Sirupsen/logrus @@ -58,6 +73,12 @@ imports: version: 69483b4bd14f5845b5a1e55bca19e954e827f1d0 subpackages: - assert +- name: golang.org/x/net + version: f01ecb60fe3835d80d9a0b7b2bf24b228c89260e + subpackages: + - html/charset + - html + - html/atom - name: golang.org/x/sys version: 4ed4d404df456f81e878683a0363ed3013a59003 subpackages: @@ -67,11 +88,25 @@ imports: subpackages: - transform - unicode/norm + - encoding + - encoding/charmap + - encoding/htmlindex + - encoding/internal/identifier + - encoding/internal + - encoding/japanese + - encoding/korean + - encoding/simplifiedchinese + - encoding/traditionalchinese + - encoding/unicode + - language + - internal/utf8internal + - runes + - internal/tag - name: gopkg.in/yaml.v2 version: cd8b52f8269e0feb286dfeef29f8fe4d5b397e0b testImports: - name: github.com/davecgh/go-spew - version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 + version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 subpackages: - spew - name: github.com/pmezard/go-difflib diff --git a/peer/peer.go b/peer/peer.go index 1ad4544..835a732 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -1,7 +1,7 @@ package peer import ( - "fmt" + "errors" "io" "net" "strconv" @@ -24,7 +24,7 @@ const ( DefaultIP = "127.0.0.1" // MessageWaitTime is the amount of time the dispatcher should wait before // attempting to read from the connection again when no data was received - MessageWaitTime = time.Second * 5 + MessageWaitTime = time.Second * 2 // MaxPeers is the maximum number of peers we can be connected to at a time MaxPeers = 50 // PeerSearchWaitTime is the amount of time the maintainConnections goroutine @@ -33,81 +33,14 @@ const ( PeerSearchWaitTime = time.Second * 10 ) -var ( - // PStore stores information about every peer we are connected to. All peers - // we connect to should have a reference to this peerstore so they can - // populate it. - PStore = &PeerStore{peers: make(map[string]*Peer, 0)} - // ListenAddr is the TCP address this host is listening on - ListenAddr string - defaultRequestHandler RequestHandler - defaultPushHandler PushHandler -) - -// PeerStore is a thread-safe container for all the peers we are currently -// connected to. It maps remote peer listen addresses to Peer objects. -type PeerStore struct { - peers map[string]*Peer - lock sync.RWMutex -} - -// NewPeerStore returns an initialized peerstore. -func NewPeerStore() *PeerStore { - return &PeerStore{ - peers: make(map[string]*Peer, 0), - lock: sync.RWMutex{}, - } -} - -// Add synchronously adds the given peer to the peerstore -func (ps *PeerStore) Add(p *Peer) { - ps.lock.Lock() - defer ps.lock.Unlock() - ps.peers[p.ListenAddr] = p -} - -// Remove synchronously removes the given peer from the peerstore -func (ps *PeerStore) Remove(addr string) { - ps.lock.Lock() - defer ps.lock.Unlock() - delete(ps.peers, addr) -} - -// RemoveRandom removes a random peer from the given PeerStore -func (ps *PeerStore) RemoveRandom() { - ps.lock.Lock() - defer ps.lock.Unlock() - for _, p := range ps.peers { - delete(ps.peers, p.ListenAddr) - break - } -} - -// Get synchronously retreives the peer with the given id from the peerstore -func (ps *PeerStore) Get(addr string) *Peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - return ps.peers[addr] -} +// ResponseHandler is any function that handles a response to a request. +type ResponseHandler func(*msg.Response) -// Addrs returns the list of addresses of the peers in the peerstore in the form -// : -func (ps *PeerStore) Addrs() []string { - ps.lock.RLock() - defer ps.lock.RUnlock() - addrs := make([]string, 0) - for addr := range ps.peers { - addrs = append(addrs, addr) - } - return addrs -} +// PushHandler is any function that handles a push message. +type PushHandler func(*msg.Push) -// Size synchornously returns the number of peers in the PeerStore -func (ps *PeerStore) Size() int { - ps.lock.RLock() - defer ps.lock.RUnlock() - return len(ps.peers) -} +// RequestHandler is any function that returns a response to a request. +type RequestHandler func(*msg.Request) msg.Response // Peer represents a remote peer we are connected to. type Peer struct { @@ -120,49 +53,34 @@ type Peer struct { lock sync.RWMutex } -// ResponseHandler is any function that handles a response to a request. -type ResponseHandler func(*msg.Response) - -// PushHandler is any function that handles a push message. -type PushHandler func(*msg.Push) - -// RequestHandler is any function that returns a response to a request. -type RequestHandler func(*msg.Request) msg.Response - // New returns a new Peer func New(c net.Conn, ps *PeerStore, listenAddr string) *Peer { return &Peer{ Connection: c, Store: ps, ListenAddr: listenAddr, - requestHandler: defaultRequestHandler, - pushHandler: defaultPushHandler, + requestHandler: ps.defaultRequestHandler, + pushHandler: ps.defaultPushHandler, responseHandlers: make(map[string]ResponseHandler), } } -// ConnectionHandler is called when a new connection is opened with us by a -// remote peer. It will create a dispatcher and message handlers to handle -// retrieving messages over the new connection and sending them to App. -func ConnectionHandler(c net.Conn) { - // Before we can continue we must exchange listen addresses - addr, err := exchangeListenAddrs(c, PeerSearchWaitTime) +// Connect attempts to establish a connection with a peer given its listen +// address (in the form :). If successful returns the +// peer, otherwise returns error. Once the connection is established the peer +// will be added to the given PeerStore and returned. +func Connect(address string, ps *PeerStore) (*Peer, error) { + c, err := conn.Dial(address) if err != nil { - log.WithError(err).Error("Failed to retrieve peer listen address") - return + return nil, err } - p := New(c, PStore, addr) - - // If we are already at MaxPeers, disconnect from a peer to connect to a new - // one. This way nobody gets choked out of the network because everybody - // happens to be fully connected. - if PStore.Size() >= MaxPeers { - PStore.RemoveRandom() + ps.ConnectionHandler(c) + p := ps.Get(c.RemoteAddr().String()) + if p == nil { + // This will only be the case if we exchangeListedAddrs fails + return nil, errors.New("Failed to exchange listen addresses with peer") } - p.Store.Add(p) - - go p.Dispatch() - log.Infof("Connected to %s", p.ListenAddr) + return p, nil } // SetRequestHandler will add the given request handler to this peer. The @@ -179,20 +97,6 @@ func (p *Peer) SetPushHandler(ph PushHandler) { p.pushHandler = ph } -// SetDefaultRequestHandler will ensure that all new peers created who's -// RequestHandlers have not been set will use the given request handler by default -// until it is overridden by the call to SetRequestHandler(). -func SetDefaultRequestHandler(rh RequestHandler) { - defaultRequestHandler = rh -} - -// SetDefaultPushHandler will ensure that all new peers created who's -// PushHandlers have not been set will use the given request handler by default -// until it is overridden by the call to SetPushHandler(). -func SetDefaultPushHandler(ph PushHandler) { - defaultPushHandler = ph -} - // Dispatch listens on this peer's Connection and passes received messages // to the appropriate message handlers. func (p *Peer) Dispatch() { @@ -205,18 +109,16 @@ func (p *Peer) Dispatch() { if err != nil { if err == io.EOF { // This just means the peer hasn't sent anything - select { - case <-time.After(MessageWaitTime): - } + time.Sleep(MessageWaitTime) } else { - log.WithError(err).Error("Dispatcher failed to read message") if strings.Contains(err.Error(), syscall.ECONNRESET.Error()) || errCount == 3 { - log.Infof("Disconnecting from peer %s due to %s", - p.Connection.RemoteAddr().String(), err.Error()) + log.WithError(err).Infof("Disconnecting from peer %s", + p.ListenAddr) p.Store.Remove(p.ListenAddr) p.Connection.Close() return } + log.WithError(err).Error("Dispatcher failed to read message") errCount++ } continue @@ -227,7 +129,7 @@ func (p *Peer) Dispatch() { case *msg.Request: if p.requestHandler == nil { log.Errorf("Request received but no request handler set for peer %s", - p.Connection.RemoteAddr().String()) + p.ListenAddr) } else { response := p.requestHandler(message.(*msg.Request)) response.Write(p.Connection) @@ -236,15 +138,16 @@ func (p *Peer) Dispatch() { res := message.(*msg.Response) rh := p.getResponseHandler(res.ID) if rh == nil { - log.Error("Dispatcher could not find response handler for response") + log.Error("Dispatcher could not find response handler for response on peer", + p.ListenAddr) } else { rh(res) p.removeResponseHandler(res.ID) } case *msg.Push: if p.pushHandler == nil { - log.Errorf("Push message received but no push handler set for peer %s", - p.Connection.RemoteAddr().String()) + log.Error("Dispatcher could not find push handler for push message on peer", + p.ListenAddr) } else { p.pushHandler(message.(*msg.Push)) } @@ -270,27 +173,17 @@ func (p *Peer) Push(push msg.Push) error { return push.Write(p.Connection) } -// Broadcast sends the given push message to all peers in the PeerStore at the -// time this function is called. Note that if we fail to write the push message -// to a peer the failure is ignored. Generally this is okay, because push -// messages sent via Broadcast() should be propagated by other peers. -func Broadcast(push msg.Push) { - PStore.lock.RLock() - defer PStore.lock.RUnlock() - for _, p := range PStore.peers { - p.Push(push) - } -} - // MaintainConnections will infinitely attempt to maintain as close to MaxPeers // connections as possible by requesting PeerInfo from peers in the PeerStore // and establishing connections with newly discovered peers. // NOTE: this should be called only once and should be run as a goroutine. -func MaintainConnections() { +func (ps *PeerStore) MaintainConnections(wg *sync.WaitGroup) { + // Signal that we MaintainConnections is running + wg.Done() for { - peerAddrs := PStore.Addrs() + peerAddrs := ps.Addrs() for i := 0; i < len(peerAddrs); i++ { - if PStore.Size() >= MaxPeers { + if ps.Size() >= MaxPeers { // Already connected to enough peers. Don't try connecting to // any more for a while. break @@ -299,38 +192,17 @@ func MaintainConnections() { ID: uuid.New().String(), ResourceType: msg.ResourcePeerInfo, } - p := PStore.Get(peerAddrs[i]) - if p != nil && peerAddrs[i] != ListenAddr { + p := ps.Get(peerAddrs[i]) + ps.lock.RLock() + if p != nil { // Need to do this check in case the peer got removed - p.Request(peerInfoRequest, PeerInfoHandler) + p.Request(peerInfoRequest, ps.PeerInfoHandler) } + ps.lock.RUnlock() } // Looks like we hit peer.MaxPeers. Wait for a while before checking how many // peers we are connected to. We don't want to spin. - select { - case <-time.After(PeerSearchWaitTime): - } - } -} - -// PeerInfoHandler will handle the response to a PeerInfo request by attempting -// to establish connections with all new peers in the given response Resource. -func PeerInfoHandler(res *msg.Response) { - peers := res.Resource.([]string) - log.Debugf("Found peers %s", peers) - for i := 0; i < len(peers) && PStore.Size() < MaxPeers; i++ { - p := PStore.Get(peers[i]) - if p != nil || peers[i] == ListenAddr { - // We are already connected to this peer. Skip it. - continue - } - newConn, err := conn.Dial(peers[i]) - if err != nil { - log.WithError(err).Errorf("Failed to dial peer %s", peers[i]) - continue - } - ConnectionHandler(newConn) - log.Infof("Connected to %s", newConn.RemoteAddr().String()) + time.Sleep(PeerSearchWaitTime) } } @@ -352,72 +224,6 @@ func (p *Peer) getResponseHandler(id string) ResponseHandler { return p.responseHandlers[id] } -func exchangeListenAddrs(c net.Conn, d time.Duration) (string, error) { - addrChan := make(chan string) - errChan := make(chan error) - - req := msg.Request{ - ID: uuid.New().String(), - ResourceType: msg.ResourcePeerInfo, - } - err := req.Write(c) - if err != nil { - return "", err - } - - // Wait for peer to request our listen address and send us its listen address. - go func() { - receivedAddr := false - sentAddr := false - var addr string - - for !receivedAddr || !sentAddr { - message, err := msg.Read(c) - if err == io.EOF { - continue - } else if err != nil { - errChan <- err - } - - switch message.(type) { - case *msg.Response: - // We got the listen address back - addr = message.(*msg.Response).Resource.(string) - if validAddress(addr) || addr != ListenAddr { - receivedAddr = true - } - case *msg.Request: - if message.(*msg.Request).ResourceType != msg.ResourcePeerInfo { - continue - } - // We got a listen address request. - // Send the remote peer our listen address - res := msg.Response{ - ID: uuid.New().String(), - Resource: ListenAddr, - } - err = res.Write(c) - if err != nil { - errChan <- err - } - sentAddr = true - default: - } - } - - addrChan <- addr - }() - - select { - case addr := <-addrChan: - return addr, nil - case err := <-errChan: - return "", err - case <-time.After(d): - return "", fmt.Errorf("Failed to exchange listen addresses with %s", c.RemoteAddr().String()) - } -} - // validAddress checks if the given TCP/IP address is valid func validAddress(addr string) bool { parts := strings.Split(addr, ":") diff --git a/peer/peer_test.go b/peer/peer_test.go index 028d0be..2c8d2b3 100644 --- a/peer/peer_test.go +++ b/peer/peer_test.go @@ -45,6 +45,8 @@ var ( "19.253.228.59", "195.118.45.237", "159.78.10.205", "206.31.54.66", "31.191.153.165", "130.235.208.32", "130.5.207.98", "5.226.180.24", } + + peerStore PeerStore ) // fakeConn implements net.Conn @@ -88,7 +90,7 @@ func TestMain(m *testing.M) { // This will error if there are concurrent accesses to the PeerStore, or error // if an atomic operation returns un unexpected result. func TestConcurrentPeerStore(t *testing.T) { - ps := NewPeerStore() + ps := NewPeerStore("") resChan1 := make(chan bool) resChan2 := make(chan bool) @@ -158,7 +160,7 @@ func TestConcurrentPeerStore(t *testing.T) { func TestRemoveRandom(t *testing.T) { var fa fakeAddr var fc fakeConn - ps := NewPeerStore() + ps := NewPeerStore("") for _, addr := range addrs1 { fa = fakeAddr{Addr: addr} fc = fakeConn{Addr: fa} @@ -176,7 +178,7 @@ func TestRemoveRandom(t *testing.T) { func TestAddrs(t *testing.T) { var fa fakeAddr var fc fakeConn - ps := NewPeerStore() + ps := NewPeerStore("") for _, addr := range addrs1 { fa = fakeAddr{Addr: addr} fc = fakeConn{Addr: fa} @@ -196,7 +198,9 @@ func TestSetRequestHandler(t *testing.T) { var fc net.Conn fa = fakeAddr{Addr: "127.0.0.1"} fc = fakeConn{Addr: fa} - p := New(fc, PStore, fa.String()) + ps := NewPeerStore("") + + p := New(fc, ps, fa.String()) if p.requestHandler != nil { t.FailNow() } @@ -214,7 +218,7 @@ func TestSetRequestHandler(t *testing.T) { t.FailNow() } - p2 := New(fc, PStore, fa.String()) + p2 := New(fc, ps, fa.String()) if p2.requestHandler != nil { t.FailNow() } @@ -225,7 +229,9 @@ func TestSetPushHandler(t *testing.T) { var fc net.Conn fa = fakeAddr{Addr: "127.0.0.1"} fc = fakeConn{Addr: fa} - p := New(fc, PStore, fa.String()) + ps := NewPeerStore("") + + p := New(fc, ps, fa.String()) if p.pushHandler != nil { t.FailNow() } @@ -238,7 +244,7 @@ func TestSetPushHandler(t *testing.T) { t.FailNow() } - p2 := New(fc, PStore, fa.String()) + p2 := New(fc, ps, fa.String()) if p2.pushHandler != nil { t.FailNow() } @@ -249,7 +255,9 @@ func TestSetDefaultRequestHandler(t *testing.T) { var fc net.Conn fa = fakeAddr{Addr: "127.0.0.1"} fc = fakeConn{Addr: fa} - p := New(fc, PStore, fa.String()) + ps := NewPeerStore("") + + p := New(fc, ps, fa.String()) if p.requestHandler != nil { t.FailNow() } @@ -261,13 +269,13 @@ func TestSetDefaultRequestHandler(t *testing.T) { } } - SetDefaultRequestHandler(rh) + ps.SetDefaultRequestHandler(rh) if p.requestHandler != nil { t.FailNow() } - p2 := New(fc, PStore, fa.String()) + p2 := New(fc, ps, fa.String()) if p2.requestHandler == nil { t.FailNow() } @@ -278,20 +286,22 @@ func TestSetDefaultPushHandler(t *testing.T) { var fc net.Conn fa = fakeAddr{Addr: "127.0.0.1"} fc = fakeConn{Addr: fa} - p := New(fc, PStore, fa.String()) + ps := NewPeerStore("") + + p := New(fc, ps, fa.String()) if p.pushHandler != nil { t.FailNow() } ph := func(req *msg.Push) {} - SetDefaultPushHandler(ph) + ps.SetDefaultPushHandler(ph) if p.pushHandler != nil { t.FailNow() } - p2 := New(fc, PStore, fa.String()) + p2 := New(fc, ps, fa.String()) if p2.pushHandler == nil { t.FailNow() } diff --git a/peer/peerstore.go b/peer/peerstore.go new file mode 100644 index 0000000..3ae4c5c --- /dev/null +++ b/peer/peerstore.go @@ -0,0 +1,229 @@ +package peer + +import ( + "encoding/json" + "fmt" + "io" + "net" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/google/uuid" + "github.com/ubclaunchpad/cumulus/msg" +) + +// PeerStore is a thread-safe container for all the peers we are currently +// connected to. It maps remote peer listen addresses to Peer objects. +type PeerStore struct { + peers map[string]*Peer + ListenAddr string + defaultRequestHandler RequestHandler + defaultPushHandler PushHandler + lock *sync.RWMutex +} + +// NewPeerStore returns an initialized peerstore. +func NewPeerStore(la string) *PeerStore { + return &PeerStore{ + peers: make(map[string]*Peer, 0), + ListenAddr: la, + lock: &sync.RWMutex{}, + } +} + +// ConnectionHandler is called when a new connection is opened with us by a +// remote peer. It will create a dispatcher and message handlers to handle +// retrieving messages over the new connection and sending them to App. +func (ps *PeerStore) ConnectionHandler(c net.Conn) { + // Before we can continue we must exchange listen addresses + addr, err := ps.exchangeListenAddrs(c, PeerSearchWaitTime) + if err != nil { + log.WithError(err).Error("Failed to retrieve peer listen address") + return + } else if ps.Get(addr) != nil || addr == ps.ListenAddr { + // We are already connected to this peer (or it's us), drop the connection + c.Close() + return + } + p := New(c, ps, addr) + + // If we are already at MaxPeers, disconnect from a peer to connect to a new + // one. This way nobody gets choked out of the network because everybody + // happens to be fully connected. + if p.Store.Size() >= MaxPeers { + p.Store.RemoveRandom() + } + p.Store.Add(p) + + go p.Dispatch() + log.Infof("Connected to %s", p.ListenAddr) +} + +// Performs a handshake over the given connection allowing us to send our +// listen address to the remote peer and to receive its litsten address. +// On success returns remote peer listen address, on failure returns error. If +// the given duration passes and we havn't received a listen address we return +// an error. +func (ps *PeerStore) exchangeListenAddrs(c net.Conn, d time.Duration) (string, error) { + addrChan := make(chan string) + errChan := make(chan error) + + req := msg.Request{ + ID: uuid.New().String(), + ResourceType: msg.ResourcePeerInfo, + } + err := req.Write(c) + if err != nil { + return "", err + } + + // Wait for peer to request our listen address and send us its listen address. + go func() { + receivedAddr := false + sentAddr := false + var addr string + + for !receivedAddr || !sentAddr { + message, err := msg.Read(c) + if err == io.EOF { + continue + } else if err != nil { + errChan <- err + } + + switch message.(type) { + case *msg.Response: + // We got the listen address back + addr = message.(*msg.Response).Resource.(string) + if validAddress(addr) || addr != ps.ListenAddr { + receivedAddr = true + } + case *msg.Request: + if message.(*msg.Request).ResourceType != msg.ResourcePeerInfo { + continue + } + // We got a listen address request. + // Send the remote peer our listen address + res := &msg.Response{ + ID: uuid.New().String(), + Resource: ps.ListenAddr, + } + err = res.Write(c) + if err != nil { + errChan <- err + } + sentAddr = true + default: + } + } + + addrChan <- addr + }() + + select { + case addr := <-addrChan: + return addr, nil + case err := <-errChan: + return "", err + case <-time.After(d): + return "", fmt.Errorf("Failed to exchange listen addresses with %s", + c.RemoteAddr().String()) + } +} + +// Add synchronously adds the given peer to the peerstore +func (ps *PeerStore) Add(p *Peer) { + ps.lock.Lock() + defer ps.lock.Unlock() + ps.peers[p.ListenAddr] = p +} + +// Remove synchronously removes the given peer from the peerstore +func (ps *PeerStore) Remove(addr string) { + ps.lock.Lock() + defer ps.lock.Unlock() + delete(ps.peers, addr) +} + +// RemoveRandom removes a random peer from the given PeerStore +func (ps *PeerStore) RemoveRandom() { + ps.lock.Lock() + defer ps.lock.Unlock() + for _, p := range ps.peers { + delete(ps.peers, p.ListenAddr) + break + } +} + +// Get synchronously retreives the peer with the given id from the peerstore +func (ps *PeerStore) Get(addr string) *Peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + return ps.peers[addr] +} + +// SetDefaultRequestHandler will ensure that all new peers created who's +// RequestHandlers have not been set will use the given request handler by default +// until it is overridden by the call to SetRequestHandler(). +func (ps *PeerStore) SetDefaultRequestHandler(rh RequestHandler) { + ps.defaultRequestHandler = rh +} + +// SetDefaultPushHandler will ensure that all new peers created who's +// PushHandlers have not been set will use the given request handler by default +// until it is overridden by the call to SetPushHandler(). +func (ps *PeerStore) SetDefaultPushHandler(ph PushHandler) { + ps.defaultPushHandler = ph +} + +// Broadcast sends the given push message to all peers in the PeerStore at the +// time this function is called. Note that if we fail to write the push message +// to a peer the failure is ignored. Generally this is okay, because push +// messages sent via Broadcast() should be propagated by other peers. +func (ps *PeerStore) Broadcast(push msg.Push) { + ps.lock.RLock() + defer ps.lock.RUnlock() + for _, p := range ps.peers { + p.Push(push) + } +} + +// PeerInfoHandler will handle the response to a PeerInfo request by attempting +// to establish connections with all new peers in the given response Resource. +func (ps *PeerStore) PeerInfoHandler(res *msg.Response) { + peers := res.Resource.([]string) + strPeers, _ := json.Marshal(peers) + log.Debugf("Found peers %s", string(strPeers)) + for i := 0; i < len(peers) && ps.Size() < MaxPeers; i++ { + if ps.Get(peers[i]) != nil || peers[i] == ps.ListenAddr { + // We are already connected to this peer. Skip it. + continue + } + + p, err := Connect(peers[i], ps) + if err != nil { + log.WithError(err).Errorf("Failed to dial peer %s", peers[i]) + } + log.Infof("Connected to %s", p.ListenAddr) + } +} + +// Addrs returns the list of addresses of the peers in the peerstore in the form +// : +func (ps *PeerStore) Addrs() []string { + ps.lock.RLock() + defer ps.lock.RUnlock() + addrs := make([]string, 0) + for addr := range ps.peers { + addrs = append(addrs, addr) + } + return addrs +} + +// Size synchornously returns the number of peers in the PeerStore +func (ps *PeerStore) Size() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + return len(ps.peers) +}