Skip to content

Commit

Permalink
Merge branch 'dev' into 77-console
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed Jul 11, 2017
2 parents f542d6f + 0ce1330 commit 0a8520d
Show file tree
Hide file tree
Showing 20 changed files with 1,896 additions and 89 deletions.
123 changes: 98 additions & 25 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ import (
"github.com/ubclaunchpad/cumulus/conn"
"github.com/ubclaunchpad/cumulus/msg"
"github.com/ubclaunchpad/cumulus/peer"
"github.com/ubclaunchpad/cumulus/pool"
)

var (
config *conf.Config
chain *blockchain.BlockChain
logFile = os.Stdout
// A reference to the transaction pool
tpool *pool.Pool
)

// Run sets up and starts a new Cumulus node with the
Expand All @@ -46,6 +49,11 @@ func Run(cfg conf.Config) {
os.Exit(0)
}()

// Below we'll connect to peers. After which, requests could begin to
// stream in. We should first initalize our pool, workers to handle
// incoming messages.
initializeNode()

// Set Peer default Push and Request handlers. These functions will handle
// request and push messages from all peers we connect to unless overridden
// for specific peers by calls like p.SetRequestHandler(someHandler)
Expand All @@ -57,9 +65,12 @@ func Run(cfg conf.Config) {
log.Infof("Starting listener on %s:%d", cfg.Interface, cfg.Port)
peer.ListenAddr = fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port)
go func() {
err := conn.Listen(fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port), peer.ConnectionHandler)
address := fmt.Sprintf("%s:%d", cfg.Interface, cfg.Port)
err := conn.Listen(address, peer.ConnectionHandler)
if err != nil {
log.WithError(err).Fatalf("Failed to listen on %s:%d", cfg.Interface, cfg.Port)
log.WithError(
err,
).Fatalf("Failed to listen on %s:%d", cfg.Interface, cfg.Port)
}
}()

Expand All @@ -74,42 +85,54 @@ func Run(cfg conf.Config) {
go RunConsole()
}

// If a target peer was supplied, connect to it and try discover and connect
// to its peers
if len(cfg.Target) > 0 {
peerInfoRequest := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourcePeerInfo,
}

log.Infof("Dialing target %s", cfg.Target)
c, err := conn.Dial(cfg.Target)
if err != nil {
log.WithError(err).Fatalf("Failed to connect to target")
}
peer.ConnectionHandler(c)
p := peer.PStore.Get(c.RemoteAddr().String())
p.Request(peerInfoRequest, peer.PeerInfoHandler)
if len(config.Target) > 0 {
// Connect to the target and discover its peers.
ConnectAndDiscover(cfg.Target)
}

// Try maintain as close to peer.MaxPeers connections as possible while this
// peer is running
go peer.MaintainConnections()

// Request the blockchain.
log.Info("Requesting blockchain from peers... ")
RequestBlockChain()

// Return to command line.
select {} // Hang main thread. Everything happens in goroutines from here
}

// RequestHandler is called every time a peer sends us a request message expect
// on peers whos PushHandlers have been overridden.
// ConnectAndDiscover tries to connect to a target and discover its peers.
func ConnectAndDiscover(target string) {
peerInfoRequest := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourcePeerInfo,
}

log.Infof("Dialing target %s", target)
c, err := conn.Dial(target)
if err != nil {
log.WithError(err).Fatalf("Failed to connect to target")
}
peer.ConnectionHandler(c)
p := peer.PStore.Get(c.RemoteAddr().String())
p.Request(peerInfoRequest, peer.PeerInfoHandler)
}

// RequestHandler is called every time a peer sends us a request message except
// on peers whos RequestHandlers have been overridden.
func RequestHandler(req *msg.Request) msg.Response {
res := msg.Response{ID: req.ID}

switch req.ResourceType {
case msg.ResourcePeerInfo:
res.Resource = peer.PStore.Addrs()
log.Debugf("Returning PeerInfo %s", res.Resource)
case msg.ResourceBlock, msg.ResourceTransaction:
res.Error = msg.NewProtocolError(msg.NotImplemented,
"Block and Transaction requests are not yet implemented on this peer")
case msg.ResourceBlock:
work := BlockWork{}
BlockWorkQueue <- work
case msg.ResourceTransaction:
work := TransactionWork{}
TransactionWorkQueue <- work
default:
res.Error = msg.NewProtocolError(msg.InvalidResourceType,
"Invalid resource type")
Expand All @@ -118,13 +141,63 @@ func RequestHandler(req *msg.Request) msg.Response {
return res
}

// PushHandler is called every time a peer sends us a Push message expect on
// PushHandler is called every time a peer sends us a Push message except on
// peers whos PushHandlers have been overridden.
func PushHandler(push *msg.Push) {
switch push.ResourceType {
case msg.ResourceBlock:
blk, ok := push.Resource.(*blockchain.Block)
if ok {
log.Info("Adding block to work queue.")
BlockWorkQueue <- BlockWork{blk, nil}
} else {
log.Error("Could not cast resource to block.")
}
case msg.ResourceTransaction:
txn, ok := push.Resource.(*blockchain.Transaction)
if ok {
log.Info("Adding transaction to work queue.")
TransactionWorkQueue <- TransactionWork{txn, nil}
} else {
log.Error("Could not cast resource to transaction.")
}
default:
// Invalid resource type. Ignore
}
}

// initializeNode creates a transaction pool, workers and queues to handle
// incoming messages.
func initializeNode() {
tpool = pool.New()
intializeQueues()
initializeWorkers()
}

// intializeQueues makes all necessary queues.
func intializeQueues() {
BlockWorkQueue = make(chan BlockWork, BlockQueueSize)
TransactionWorkQueue = make(chan TransactionWork, TransactionQueueSize)
QuitChan = make(chan int)
}

// initializeWorkers kicks off workers to handle incoming requests.
func initializeWorkers() {
for i := 0; i < nWorkers; i++ {
log.WithFields(log.Fields{"id": i}).Debug("Starting worker. ")
worker := NewWorker(i)
worker.Start()
workers[i] = &worker
}
}

// killWorkers kills all workers.
func killWorkers() {
for i := 0; i < nWorkers; i++ {
QuitChan <- i
workers[i] = nil
}
}

// RequestBlockChain asks existing peers for a copy of the blockchain.
func RequestBlockChain() {}
118 changes: 118 additions & 0 deletions app/app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package app

import (
"testing"
"time"

log "github.com/Sirupsen/logrus"
"github.com/ubclaunchpad/cumulus/blockchain"
"github.com/ubclaunchpad/cumulus/msg"
)

func init() {
log.SetLevel(log.InfoLevel)
}

func TestKillWorkers(t *testing.T) {
intializeQueues()
time.Sleep(20 * time.Millisecond)
for i := 0; i < nWorkers; i++ {
if workers[i] != nil {
t.FailNow()
}
}
initializeWorkers()
time.Sleep(20 * time.Millisecond)
for i := 0; i < nWorkers; i++ {
if workers[i] == nil {
t.FailNow()
}
}
killWorkers()
time.Sleep(20 * time.Millisecond)
for i := 0; i < nWorkers; i++ {
if workers[i] != nil {
t.FailNow()
}
}
}

func TestInitializeNode(t *testing.T) {
initializeNode()
if tpool == nil {
t.FailNow()
}
if BlockWorkQueue == nil {
t.FailNow()
}
if TransactionWorkQueue == nil {
t.FailNow()
}
killWorkers()
}

func TestPushHandlerNewBlock(t *testing.T) {
intializeQueues()
_, b := blockchain.NewValidTestChainAndBlock()
push := msg.Push{
ResourceType: msg.ResourceBlock,
Resource: b,
}
PushHandler(&push)
select {
case work, ok := <-BlockWorkQueue:
if !ok {
t.FailNow()
}
if work.Block != b {
t.FailNow()
}
}
// Add more here...
}

func TestPushHandlerNewTestTransaction(t *testing.T) {
intializeQueues()
txn := blockchain.NewTestTransaction()
push := msg.Push{
ResourceType: msg.ResourceTransaction,
Resource: txn,
}
PushHandler(&push)
select {
case work, ok := <-TransactionWorkQueue:
if !ok {
t.FailNow()
}
if work.Transaction != txn {
t.FailNow()
}
}
// Add more here...
}

func TestRequestHandlerNewBlock(t *testing.T) {
intializeQueues()
push := msg.Request{ResourceType: msg.ResourceBlock}
RequestHandler(&push)
select {
case _, ok := <-BlockWorkQueue:
if !ok {
t.FailNow()
}
}
// Add more here...
}

func TestRequestHandlerNewTestTransaction(t *testing.T) {
intializeQueues()
push := msg.Request{ResourceType: msg.ResourceTransaction}
RequestHandler(&push)
select {
case _, ok := <-TransactionWorkQueue:
if !ok {
t.FailNow()
}
}
// Add more here...
}
37 changes: 37 additions & 0 deletions app/work.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package app

import "github.com/ubclaunchpad/cumulus/blockchain"

const (
// BlockQueueSize is the size of the BlockQueue channel.
BlockQueueSize = 100
// TransactionQueueSize is the size of the BlockQueue channel.
TransactionQueueSize = 100
)

// Responder is used to handle requests who require a response.
type Responder interface {
Send(ok bool)
Lock()
Unlock()
}

// BlockWorkQueue is a queue of blocks to process.
var BlockWorkQueue = make(chan BlockWork, BlockQueueSize)

// TransactionWorkQueue is a queue of transactions to process.
var TransactionWorkQueue = make(chan TransactionWork, TransactionQueueSize)

// TransactionWork holds a new transaction job, and a Responder for
// sending results.
type TransactionWork struct {
*blockchain.Transaction
Responder
}

// BlockWork holds a new block job, and a Responder for
// sending results.
type BlockWork struct {
*blockchain.Block
Responder
}
Loading

0 comments on commit 0a8520d

Please sign in to comment.