Skip to content

Commit

Permalink
Merge 7b66c38 into 0c6862b
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed Jul 22, 2017
2 parents 0c6862b + 7b66c38 commit a7a04e7
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 305 deletions.
74 changes: 44 additions & 30 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"

log "github.com/Sirupsen/logrus"
Expand All @@ -18,24 +19,38 @@ import (
)

var (
config *conf.Config
chain *blockchain.BlockChain
logFile = os.Stdout
// A reference to the transaction pool
tpool *pool.Pool
)

// App contains information about a running instance of a Cumulus node
type App struct {
PeerStore *peer.PeerStore
}

// Run sets up and starts a new Cumulus node with the
// given configuration.
// given configuration. This should only be called once (except in tests)
func Run(cfg conf.Config) {
log.Info("Starting Cumulus node")
config = &cfg
config := &cfg

addr := fmt.Sprintf("%s:%d", config.Interface, config.Port)
a := App{
PeerStore: peer.NewPeerStore(addr),
}

// Set logging level
if cfg.Verbose {
log.SetLevel(log.DebugLevel)
}

// We'll need to wait on at least 2 goroutines (Listen and
// MaintainConnections) to start before returning
wg := &sync.WaitGroup{}
wg.Add(2)

// Start a goroutine that waits for program termination. Before the program
// exits it will flush logs and save the blockchain.
c := make(chan os.Signal, 1)
Expand All @@ -57,73 +72,72 @@ func Run(cfg conf.Config) {
// Set Peer default Push and Request handlers. These functions will handle
// request and push messages from all peers we connect to unless overridden
// for specific peers by calls like p.SetRequestHandler(someHandler)
peer.SetDefaultPushHandler(PushHandler)
peer.SetDefaultRequestHandler(RequestHandler)
a.PeerStore.SetDefaultPushHandler(a.PushHandler)
a.PeerStore.SetDefaultRequestHandler(a.RequestHandler)

// Start listening on the given interface and port so we can receive
// conenctions from other peers
log.Infof("Starting listener on %s:%d", cfg.Interface, cfg.Port)
peer.ListenAddr = fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port)
a.PeerStore.ListenAddr = addr
go func() {
address := fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port)
err := conn.Listen(address, peer.ConnectionHandler)
err := conn.Listen(addr, a.PeerStore.ConnectionHandler, wg)
if err != nil {
log.WithError(
err,
).Fatalf("Failed to listen on %s:%d", cfg.Interface, cfg.Port)
log.WithError(err).Fatalf("Failed to listen on %s", addr)
}
}()

// If the console flag was passed, redirect logs to a file and run the console
// NOTE: if the log file already exists we will exit with a fatal error here!
// This should stop people from running multiple Cumulus nodes that will try
// to log to the same file.
if cfg.Console {
logFile, err := os.OpenFile("logfile", os.O_WRONLY|os.O_CREATE, 0755)
logFile, err := os.OpenFile("logfile", os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0755)
if err != nil {
log.WithError(err).Fatal("Failed to redirect logs to log file")
log.WithError(err).Fatal("Failed to redirect logs to file")
}
log.Warn("Redirecting logs to logfile")
log.Warn("Redirecting logs to file")
log.SetOutput(logFile)
go RunConsole()
wg.Add(1)
go RunConsole(a.PeerStore, wg)
}

if len(config.Target) > 0 {
// Connect to the target and discover its peers.
ConnectAndDiscover(cfg.Target)
a.ConnectAndDiscover(cfg.Target)
}

// Try maintain as close to peer.MaxPeers connections as possible while this
// peer is running
go peer.MaintainConnections()
go a.PeerStore.MaintainConnections(wg)

// Request the blockchain.
if chain == nil {
log.Info("Request blockchain from peers not yet implemented.")
initializeChain()
}

// Return to command line.
select {} // Hang main thread. Everything happens in goroutines from here
// Wait for goroutines to start
wg.Wait()
}

// ConnectAndDiscover tries to connect to a target and discover its peers.
func ConnectAndDiscover(target string) {
func (a App) ConnectAndDiscover(target string) {
peerInfoRequest := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourcePeerInfo,
}

log.Infof("Dialing target %s", target)
c, err := conn.Dial(target)
p, err := peer.Connect(target, a.PeerStore)
if err != nil {
log.WithError(err).Fatalf("Failed to connect to target")
log.WithError(err).Fatal("Failed to dial target")
}
peer.ConnectionHandler(c)
p := peer.PStore.Get(c.RemoteAddr().String())
p.Request(peerInfoRequest, peer.PeerInfoHandler)
p.Request(peerInfoRequest, a.PeerStore.PeerInfoHandler)
}

// RequestHandler is called every time a peer sends us a request message except
// on peers whos RequestHandlers have been overridden.
func RequestHandler(req *msg.Request) msg.Response {
// RequestHandler is called every time a peer sends us a request message expect
// on peers whos PushHandlers have been overridden.
func (a App) RequestHandler(req *msg.Request) msg.Response {
res := msg.Response{ID: req.ID}

// Build some error types.
Expand All @@ -134,7 +148,7 @@ func RequestHandler(req *msg.Request) msg.Response {

switch req.ResourceType {
case msg.ResourcePeerInfo:
res.Resource = peer.PStore.Addrs()
res.Resource = a.PeerStore.Addrs()
case msg.ResourceBlock:
// Block is requested by number.
blockNumber, ok := req.Params["blockNumber"].(uint32)
Expand All @@ -161,7 +175,7 @@ func RequestHandler(req *msg.Request) msg.Response {

// PushHandler is called every time a peer sends us a Push message except on
// peers whos PushHandlers have been overridden.
func PushHandler(push *msg.Push) {
func (a App) PushHandler(push *msg.Push) {
switch push.ResourceType {
case msg.ResourceBlock:
blk, ok := push.Resource.(*blockchain.Block)
Expand Down
23 changes: 17 additions & 6 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/ubclaunchpad/cumulus/blockchain"
"github.com/ubclaunchpad/cumulus/msg"
"github.com/ubclaunchpad/cumulus/peer"
)

func init() {
Expand Down Expand Up @@ -68,7 +69,10 @@ func TestPushHandlerNewBlock(t *testing.T) {
ResourceType: msg.ResourceBlock,
Resource: b,
}
PushHandler(&push)
a := App{
PeerStore: peer.NewPeerStore("127.0.0.1:8000"),
}
a.PushHandler(&push)
select {
case work, ok := <-BlockWorkQueue:
if !ok {
Expand All @@ -88,7 +92,10 @@ func TestPushHandlerNewTestTransaction(t *testing.T) {
ResourceType: msg.ResourceTransaction,
Resource: txn,
}
PushHandler(&push)
a := App{
PeerStore: peer.NewPeerStore("127.0.0.1:8000"),
}
a.PushHandler(&push)
select {
case work, ok := <-TransactionWorkQueue:
if !ok {
Expand All @@ -103,12 +110,13 @@ func TestPushHandlerNewTestTransaction(t *testing.T) {

func TestRequestHandlerNewBlockOK(t *testing.T) {
initializeChain()
a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")}

// Set up a request (requesting block 0)
blockNumber := uint32(0)
req := createNewBlockRequest(blockNumber)

resp := RequestHandler(req)
resp := a.RequestHandler(req)
block, ok := resp.Resource.(*blockchain.Block)

// Assertion time!
Expand All @@ -119,12 +127,13 @@ func TestRequestHandlerNewBlockOK(t *testing.T) {

func TestRequestHandlerNewBlockBadParams(t *testing.T) {
initializeChain()
a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")}

// Set up a request.
blockNumber := "definitelynotanindex"
req := createNewBlockRequest(blockNumber)

resp := RequestHandler(req)
resp := a.RequestHandler(req)
block, ok := resp.Resource.(*blockchain.Block)

// Make sure request failed.
Expand All @@ -134,12 +143,13 @@ func TestRequestHandlerNewBlockBadParams(t *testing.T) {

func TestRequestHandlerNewBlockBadType(t *testing.T) {
initializeChain()
a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")}

// Set up a request.
req := createNewBlockRequest("doesntmatter")
req.ResourceType = 25

resp := RequestHandler(req)
resp := a.RequestHandler(req)
block, ok := resp.Resource.(*blockchain.Block)

// Make sure request failed.
Expand All @@ -149,12 +159,13 @@ func TestRequestHandlerNewBlockBadType(t *testing.T) {

func TestRequestHandlerPeerInfo(t *testing.T) {
initializeChain()
a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")}

// Set up a request.
req := createNewBlockRequest("doesntmatter")
req.ResourceType = msg.ResourcePeerInfo

resp := RequestHandler(req)
resp := a.RequestHandler(req)
res := resp.Resource

// Make sure request did not fail.
Expand Down
31 changes: 18 additions & 13 deletions app/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,29 @@ package app

import (
"strconv"
"sync"

"github.com/abiosoft/ishell"
"github.com/ubclaunchpad/cumulus/blockchain"
"github.com/ubclaunchpad/cumulus/conn"
"github.com/ubclaunchpad/cumulus/peer"
)

var shell *ishell.Shell
var (
shell *ishell.Shell
peerStore *peer.PeerStore
)

// RunConsole starts the Cumulus console. This should be run only once as a
// goroutine, and logging should be redirected away from stdout before it is run.
func RunConsole() {
// It takes a pointer to a PeerStore so we can use the PeerStore to interact
// with other peers and give the user info about the running instance.
func RunConsole(ps *peer.PeerStore, wg *sync.WaitGroup) {
// Wait for services to start before we star the console
wg.Wait()

peerStore = ps
shell = ishell.New()

shell.AddCmd(&ishell.Cmd{
Name: "create",
Help: "create a new wallet hash or transaction",
Expand All @@ -26,7 +36,7 @@ func RunConsole() {
Func: check,
})
shell.AddCmd(&ishell.Cmd{
Name: "listen-address",
Name: "address",
Help: "show the address this host is listening on",
Func: listenAddr,
})
Expand Down Expand Up @@ -83,11 +93,11 @@ func check(ctx *ishell.Context) {
}

func listenAddr(ctx *ishell.Context) {
shell.Println("Listening on", peer.ListenAddr)
shell.Println("Listening on", peerStore.ListenAddr)
}

func peers(tcx *ishell.Context) {
shell.Println("Connected to", peer.PStore.Addrs())
shell.Println("Connected to", peerStore.Addrs())
}

func connect(ctx *ishell.Context) {
Expand All @@ -97,14 +107,9 @@ func connect(ctx *ishell.Context) {
}

addr := ctx.Args[0]
c, err := conn.Dial(addr)
_, err := peer.Connect(addr, peerStore)
if err != nil {
shell.Println("Failed to dial peer", addr, ":", err)
return
}
peer.ConnectionHandler(c)
if peer.PStore.Get(addr) == nil {
shell.Println("Failed to extablish connection. See logs for details.")
shell.Println("Failed to extablish connection:", err)
} else {
shell.Println("Connected to", addr)
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ var runCmd = &cobra.Command{
Verbose: verbose,
Console: console,
}

// Start the application
app.Run(config)

// Hang main thread. Everything happens in goroutines from here
select {}
},
}

Expand Down
5 changes: 4 additions & 1 deletion conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package conn

import (
"net"
"sync"
)

// Dial opens a connection to a remote host. `host` should be a string
Expand All @@ -12,8 +13,10 @@ func Dial(host string) (net.Conn, error) {

// Listen binds to a TCP port and waits for incoming connections.
// When a connection is accepted, dispatches to the handler.
func Listen(iface string, handler func(net.Conn)) error {
// Calls Done on waitgroup to signal that we are now listening.
func Listen(iface string, handler func(net.Conn), wg *sync.WaitGroup) error {
listener, err := net.Listen("tcp", iface)
wg.Done()
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestConnect(t *testing.T) {

func TestListen(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(5)
wg.Add(6)

handler := func(c net.Conn) {
defer c.Close()
Expand All @@ -33,7 +33,7 @@ func TestListen(t *testing.T) {
wg.Done()
}

go Listen(":8080", handler)
go Listen(":8080", handler, &wg)
// Sleep to guarantee that our listener is ready when we start making connections
time.Sleep(time.Millisecond)

Expand Down
Loading

0 comments on commit a7a04e7

Please sign in to comment.