Skip to content

Commit

Permalink
Merge branch '57-message-handling' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed Jun 17, 2017
2 parents 1b2107c + 9713590 commit 7cba911
Show file tree
Hide file tree
Showing 11 changed files with 925 additions and 452 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ _testmain.go

# Glide dependencies
vendor

# Text Editors and IDEs
.vscode
101 changes: 101 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package app

import (
"fmt"

log "github.com/Sirupsen/logrus"
"github.com/google/uuid"

"github.com/ubclaunchpad/cumulus/blockchain"
"github.com/ubclaunchpad/cumulus/conf"
"github.com/ubclaunchpad/cumulus/conn"
"github.com/ubclaunchpad/cumulus/msg"
"github.com/ubclaunchpad/cumulus/peer"
)

var (
config *conf.Config
// TODO peer store once it's merged in
chain *blockchain.BlockChain
)

// Run sets up and starts a new Cumulus node with the
// given configuration.
func Run(cfg conf.Config) {
log.Info("Starting Cumulus node")
config = &cfg

// Set Peer default Push and Request handlers. These functions will handle
// request and push messages from all peers we connect to unless overridden
// for specific peers by calls like p.SetRequestHandler(someHandler)
peer.SetDefaultPushHandler(PushHandler)
peer.SetDefaultRequestHandler(RequestHandler)

// Start listening on the given interface and port so we can receive
// conenctions from other peers
log.Infof("Starting listener on %s:%d", cfg.Interface, cfg.Port)
peer.LocalAddr = fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port)
go func() {
err := conn.Listen(fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port), peer.ConnectionHandler)
if err != nil {
log.WithError(err).Fatalf("Failed to listen on %s:%d", cfg.Interface, cfg.Port)
}
}()

// If a target peer was supplied, connect to it and try discover and connect
// to its peers
if len(cfg.Target) > 0 {
peerInfoRequest := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourcePeerInfo,
}

log.Infof("Dialing target %s", cfg.Target)
c, err := conn.Dial(cfg.Target)
if err != nil {
log.WithError(err).Fatalf("Failed to connect to target")
}
peer.ConnectionHandler(c)
p := peer.PStore.Get(c.RemoteAddr().String())
p.Request(peerInfoRequest, peer.PeerInfoHandler)
}

// Try maintain as close to peer.MaxPeers connections as possible while this
// peer is running
go peer.MaintainConnections()
select {}
}

// RequestHandler is called every time a peer sends us a request message expect
// on peers whos PushHandlers have been overridden.
func RequestHandler(req *msg.Request) msg.Response {
res := msg.Response{ID: req.ID}

switch req.ResourceType {
case msg.ResourcePeerInfo:
res.Resource = peer.PStore.Addrs()
case msg.ResourceBlock, msg.ResourceTransaction:
res.Error = msg.NewProtocolError(msg.NotImplemented,
"Block and Transaction requests are not yet implemented on this peer")
default:
res.Error = msg.NewProtocolError(msg.InvalidResourceType,
"Invalid resource type")
}

return res
}

// PushHandler is called every time a peer sends us a Push message expect on
// peers whos PushHandlers have been overridden.
func PushHandler(push *msg.Push) {
switch push.ResourceType {
case msg.ResourceBlock:
case msg.ResourceTransaction:
default:
// Invalid resource type. Ignore
}

// Ask target for its peers
// Connect to these peers until we have enough peers
// Download the blockchain
}
61 changes: 11 additions & 50 deletions cmd/run.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package cmd

import (
"io/ioutil"

log "github.com/Sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/ubclaunchpad/cumulus/app"
"github.com/ubclaunchpad/cumulus/conf"
"github.com/ubclaunchpad/cumulus/peer"
)

Expand All @@ -16,10 +15,16 @@ var runCmd = &cobra.Command{
If a target is not provided, listen for incoming connections.`,
Run: func(cmd *cobra.Command, args []string) {
port, _ := cmd.Flags().GetInt("port")
ip, _ := cmd.Flags().GetString("interface")
iface, _ := cmd.Flags().GetString("interface")
target, _ := cmd.Flags().GetString("target")
verbose, _ := cmd.Flags().GetBool("verbose")
run(port, ip, target, verbose)
config := conf.Config{
Interface: iface,
Port: uint16(port),
Target: target,
Verbose: verbose,
}
app.Run(config)
},
}

Expand All @@ -36,50 +41,6 @@ func init() {
// is called directly, e.g.:
runCmd.Flags().IntP("port", "p", peer.DefaultPort, "Port to bind to")
runCmd.Flags().StringP("interface", "i", peer.DefaultIP, "IP address to listen on")
runCmd.Flags().StringP("target", "t", "", "Multiaddress of peer to connect to")
runCmd.Flags().StringP("target", "t", "", "Address of peer to connect to")
runCmd.Flags().BoolP("verbose", "v", false, "Enable verbose logging")
}

func run(port int, ip, target string, verbose bool) {
log.Info("Starting Cumulus Peer")

if verbose {
log.SetLevel(log.DebugLevel)
}

// Set up a new host on the Cumulus network
host, err := peer.New(ip, port)
if err != nil {
log.Fatal(err)
}

// Set the host StreamHandler for the Cumulus Protocol and use
// BasicStreamHandler as its StreamHandler.
host.SetStreamHandler(peer.CumulusProtocol, host.Receive)
if target == "" {
// No target was specified, wait for incoming connections
log.Info("No target provided. Listening for incoming connections...")
select {} // Hang until someone connects to us
}

stream, err := host.Connect(target)
if err != nil {
log.WithError(err).Fatal("Error connecting to target: ", target)
}

// Send a message to the peer
_, err = stream.Write([]byte("Hello, world!"))
if err != nil {
log.WithError(err).Fatal("Error sending a message to the peer")
}

// Read the reply from the peer
reply, err := ioutil.ReadAll(stream)
if err != nil {
log.WithError(err).Fatal("Error reading a message from the peer")
}

log.Debugf("Peer %s read reply: %s", host.ID(), string(reply))

host.Close()
}
17 changes: 17 additions & 0 deletions conf/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package conf

// Config contains all configuration options for a node.
type Config struct {
// The interface to listen on for new connections.
Interface string
// The port to listen on for new connections.
Port uint16
// The address of the ingress node we should use to connect
// to the network.
Target string

// Whether or not to enable verbose logging.
Verbose bool
// Whether or not to participate in mining new blocks.
Mine bool
}
4 changes: 3 additions & 1 deletion conn/conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package conn

import "net"
import (
"net"
)

// Dial opens a connection to a remote host. `host` should be a string
// in the format <IP>|<hostname>:<port>
Expand Down

0 comments on commit 7cba911

Please sign in to comment.