Skip to content

Commit

Permalink
Merge 815f38e into d630511
Browse files Browse the repository at this point in the history
  • Loading branch information
chadlagore committed Jul 26, 2017
2 parents d630511 + 815f38e commit 7a70431
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 461 deletions.
121 changes: 75 additions & 46 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,38 @@ import (
"github.com/ubclaunchpad/cumulus/blockchain"
"github.com/ubclaunchpad/cumulus/conf"
"github.com/ubclaunchpad/cumulus/conn"
"github.com/ubclaunchpad/cumulus/miner"
"github.com/ubclaunchpad/cumulus/msg"
"github.com/ubclaunchpad/cumulus/peer"
"github.com/ubclaunchpad/cumulus/pool"
)

var (
chain *blockchain.BlockChain
logFile = os.Stdout
// A reference to the transaction pool
tpool *pool.Pool
)

const (
blockQueueSize = 100
transactionQueueSize = 100
)

// App contains information about a running instance of a Cumulus node
type App struct {
PeerStore *peer.PeerStore
CurrentUser *User
PeerStore *peer.PeerStore
Chain *blockchain.BlockChain
Pool *pool.Pool
}

// BlockWorkQueue is a queue of blocks to process.
var blockQueue = make(chan *blockchain.Block, blockQueueSize)

// TransactionWorkQueue is a queue of transactions to process.
var transactionQueue = make(chan *blockchain.Transaction, transactionQueueSize)

// QuitChan kills the app worker.
var quitChan = make(chan bool)

// Run sets up and starts a new Cumulus node with the
// given configuration. This should only be called once (except in tests)
func Run(cfg conf.Config) {
Expand All @@ -38,7 +53,10 @@ func Run(cfg conf.Config) {

addr := fmt.Sprintf("%s:%d", config.Interface, config.Port)
a := App{
PeerStore: peer.NewPeerStore(addr),
PeerStore: peer.NewPeerStore(addr),
CurrentUser: getCurrentUser(),
Chain: getLocalChain(),
Pool: getLocalPool(),
}

// Set logging level
Expand All @@ -65,9 +83,8 @@ func Run(cfg conf.Config) {
}()

// Below we'll connect to peers. After which, requests could begin to
// stream in. We should first initalize our pool, workers to handle
// incoming messages.
initializeNode()
// stream in. Kick off a worker to handle requests and pushes.
go a.HandleWork()

// Set Peer default Push and Request handlers. These functions will handle
// request and push messages from all peers we connect to unless overridden
Expand All @@ -90,12 +107,6 @@ func Run(cfg conf.Config) {
// peer is running
go a.PeerStore.MaintainConnections(wg)

// Request the blockchain.
if chain == nil {
log.Info("Request blockchain from peers not yet implemented.")
initializeChain()
}

// Wait for goroutines to start
wg.Wait()

Expand All @@ -110,7 +121,7 @@ func Run(cfg conf.Config) {
}
log.Info("Redirecting logs to logfile")
log.SetOutput(logFile)
go RunConsole(a.PeerStore)
go RunConsole(a.PeerStore, &a)
}

if len(config.Target) > 0 {
Expand Down Expand Up @@ -153,7 +164,8 @@ func (a *App) RequestHandler(req *msg.Request) msg.Response {
blockNumber, ok := req.Params["blockNumber"].(uint32)
if ok {
// If its ok, we make try to a copy of it.
blk, err := chain.CopyBlockByIndex(blockNumber)
// TODO: Make this CopyBlockByHash.
blk, err := a.Chain.CopyBlockByIndex(blockNumber)
if err != nil {
// Bad index parameter.
res.Error = notFoundErr
Expand All @@ -180,15 +192,15 @@ func (a *App) PushHandler(push *msg.Push) {
blk, ok := push.Resource.(*blockchain.Block)
if ok {
log.Info("Adding block to work queue.")
BlockWorkQueue <- BlockWork{blk, nil}
blockQueue <- blk
} else {
log.Error("Could not cast resource to block.")
}
case msg.ResourceTransaction:
txn, ok := push.Resource.(*blockchain.Transaction)
if ok {
log.Info("Adding transaction to work queue.")
TransactionWorkQueue <- TransactionWork{txn, nil}
transactionQueue <- txn
} else {
log.Error("Could not cast resource to transaction.")
}
Expand All @@ -197,42 +209,59 @@ func (a *App) PushHandler(push *msg.Push) {
}
}

// initializeNode creates a transaction pool, workers and queues to handle
// incoming messages.
func initializeNode() {
tpool = pool.New()
intializeQueues()
initializeWorkers()
// getLocalChain returns an instance of the blockchain.
func getLocalChain() *blockchain.BlockChain {
// TODO: Look for local chain on disk. If doesn't exist, go rummaging
// around on the internets for one.
bc, _ := blockchain.NewValidTestChainAndBlock()
return bc
}

// intializeQueues makes all necessary queues.
func intializeQueues() {
BlockWorkQueue = make(chan BlockWork, BlockQueueSize)
TransactionWorkQueue = make(chan TransactionWork, TransactionQueueSize)
QuitChan = make(chan int)
// getLocalPool returns an instance of the pool.
func getLocalPool() *pool.Pool {
// TODO: Look for local pool on disk. If doesn't exist, make a new one.
return pool.New()
}

// initializeWorkers kicks off workers to handle incoming requests.
func initializeWorkers() {
for i := 0; i < nWorkers; i++ {
log.WithFields(log.Fields{"id": i}).Debug("Starting worker. ")
worker := NewWorker(i)
worker.Start()
workers[i] = &worker
// HandleWork continually collects new work from existing work channels.
func (a *App) HandleWork() {
log.Debug("Worker waiting for work.")
for {
select {
case work := <-transactionQueue:
a.HandleTransaction(work)
case work := <-blockQueue:
a.HandleBlock(work)
case <-quitChan:
return
}
}
}

// initializeChain creates the blockchain for the node.
func initializeChain() {
chain, _ = blockchain.NewValidTestChainAndBlock()
// TODO: Check if chain exists on disk.
// TODO: If not, request chain from peers.
// HandleTransaction handles new instance of TransactionWork.
func (a *App) HandleTransaction(txn *blockchain.Transaction) {
validTransaction := a.Pool.Set(txn, a.Chain)
if validTransaction {
log.Debug("added transaction to pool from address: " + txn.Sender.Repr())
} else {
log.Debug("bad transaction rejected from sender: " + txn.Sender.Repr())
}
}

// killWorkers kills all workers.
func killWorkers() {
for i := 0; i < nWorkers; i++ {
QuitChan <- i
workers[i] = nil
// HandleBlock handles new instance of BlockWork.
func (a *App) HandleBlock(blk *blockchain.Block) {
validBlock := a.Pool.Update(blk, a.Chain)

if validBlock {
// Append to the chain before requesting
// the next block so that the block
// numbers make sense.
a.Chain.AppendBlock(blk)
address := a.CurrentUser.Wallet.Public()
blk := a.Pool.NextBlock(a.Chain, address, a.CurrentUser.BlockSize)
if miner.IsMining() {
miner.RestartMiner(a.Chain, blk)
}
log.Debug("added blk number %d to chain", blk.BlockNumber)
}
}
Loading

0 comments on commit 7a70431

Please sign in to comment.