Skip to content

Commit

Permalink
Merge 7b7ad06 into 7eee1b7
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed Aug 3, 2017
2 parents 7eee1b7 + 7b7ad06 commit 39a61de
Show file tree
Hide file tree
Showing 21 changed files with 965 additions and 271 deletions.
189 changes: 162 additions & 27 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"errors"
"fmt"
"os"
"os/signal"
Expand All @@ -13,17 +14,15 @@ import (
"github.com/ubclaunchpad/cumulus/blockchain"
"github.com/ubclaunchpad/cumulus/conf"
"github.com/ubclaunchpad/cumulus/conn"
"github.com/ubclaunchpad/cumulus/consensus"
"github.com/ubclaunchpad/cumulus/miner"
"github.com/ubclaunchpad/cumulus/msg"
"github.com/ubclaunchpad/cumulus/peer"
"github.com/ubclaunchpad/cumulus/pool"
)

var (
logFile = os.Stdout
blockQueue = make(chan *blockchain.Block, blockQueueSize)
transactionQueue = make(chan *blockchain.Transaction, transactionQueueSize)
quitChan = make(chan bool)
logFile = os.Stdout
)

const (
Expand All @@ -33,10 +32,13 @@ const (

// App contains information about a running instance of a Cumulus node
type App struct {
CurrentUser *User
PeerStore *peer.PeerStore
Chain *blockchain.BlockChain
Pool *pool.Pool
CurrentUser *User
PeerStore *peer.PeerStore
Chain *blockchain.BlockChain
Pool *pool.Pool
blockQueue chan *blockchain.Block
transactionQueue chan *blockchain.Transaction
quitChan chan bool
}

// Run sets up and starts a new Cumulus node with the
Expand All @@ -46,11 +48,15 @@ func Run(cfg conf.Config) {
config := &cfg

addr := fmt.Sprintf("%s:%d", config.Interface, config.Port)
user := getCurrentUser()
a := App{
PeerStore: peer.NewPeerStore(addr),
CurrentUser: getCurrentUser(),
Chain: getLocalChain(),
Pool: getLocalPool(),
PeerStore: peer.NewPeerStore(addr),
CurrentUser: user,
Chain: getLocalChain(user),
Pool: getLocalPool(),
blockQueue: make(chan *blockchain.Block, blockQueueSize),
transactionQueue: make(chan *blockchain.Transaction, transactionQueueSize),
quitChan: make(chan bool),
}

// Set logging level
Expand Down Expand Up @@ -80,6 +86,11 @@ func Run(cfg conf.Config) {
// stream in. Kick off a worker to handle requests and pushes.
go a.HandleWork()

if !config.NoMiner {
// Start the miner
go a.Mine()
}

// Set Peer default Push and Request handlers. These functions will handle
// request and push messages from all peers we connect to unless overridden
// for specific peers by calls like p.SetRequestHandler(someHandler)
Expand Down Expand Up @@ -154,20 +165,19 @@ func (a *App) RequestHandler(req *msg.Request) msg.Response {
case msg.ResourcePeerInfo:
res.Resource = a.PeerStore.Addrs()
case msg.ResourceBlock:
// Block is requested by number.
blockNumber, ok := req.Params["blockNumber"].(uint32)
// Block is requested by block hash.
blockHash, ok := req.Params["lastBlockHash"].(blockchain.Hash)
if ok {
// If its ok, we make try to a copy of it.
// TODO: Make this CopyBlockByHash.
blk, err := a.Chain.CopyBlockByIndex(blockNumber)
blk, err := a.Chain.CopyBlockByLastBlockHash(blockHash)
if err != nil {
// Bad index parameter.
res.Error = notFoundErr
} else {
res.Resource = blk
}
} else {
// No index parameter.
// No blockHash parameter.
res.Error = notFoundErr
}
default:
Expand All @@ -186,15 +196,15 @@ func (a *App) PushHandler(push *msg.Push) {
blk, ok := push.Resource.(*blockchain.Block)
if ok {
log.Info("Adding block to work queue.")
blockQueue <- blk
a.blockQueue <- blk
} else {
log.Error("Could not cast resource to block.")
}
case msg.ResourceTransaction:
txn, ok := push.Resource.(*blockchain.Transaction)
if ok {
log.Info("Adding transaction to work queue.")
transactionQueue <- txn
a.transactionQueue <- txn
} else {
log.Error("Could not cast resource to transaction.")
}
Expand All @@ -204,11 +214,21 @@ func (a *App) PushHandler(push *msg.Push) {
}

// getLocalChain returns an instance of the blockchain.
func getLocalChain() *blockchain.BlockChain {
func getLocalChain(user *User) *blockchain.BlockChain {
// TODO: Look for local chain on disk. If doesn't exist, go rummaging
// around on the internets for one.
bc, _ := blockchain.NewValidTestChainAndBlock()
return bc
bc := blockchain.BlockChain{
Blocks: make([]*blockchain.Block, 0),
Head: blockchain.NilHash,
}

genisisBlock := blockchain.Genesis(user.Wallet.Public(),
consensus.CurrentTarget(), consensus.StartingBlockReward,
make([]byte, 0))

bc.Blocks = append(bc.Blocks, genisisBlock)
bc.Head = blockchain.HashSum(genisisBlock)
return &bc
}

// getLocalPool returns an instance of the pool.
Expand All @@ -222,11 +242,11 @@ func (a *App) HandleWork() {
log.Debug("Worker waiting for work.")
for {
select {
case work := <-transactionQueue:
case work := <-a.transactionQueue:
a.HandleTransaction(work)
case work := <-blockQueue:
case work := <-a.blockQueue:
a.HandleBlock(work)
case <-quitChan:
case <-a.quitChan:
return
}
}
Expand All @@ -247,8 +267,7 @@ func (a *App) HandleBlock(blk *blockchain.Block) {
validBlock := a.Pool.Update(blk, a.Chain)

if validBlock {
// Append to the chain before requesting
// the next block so that the block
// Append to the chain before requesting the next block so that the block
// numbers make sense.
a.Chain.AppendBlock(blk)
address := a.CurrentUser.Wallet.Public()
Expand All @@ -259,3 +278,119 @@ func (a *App) HandleBlock(blk *blockchain.Block) {
log.Debug("added blk number %d to chain", blk.BlockNumber)
}
}

// Mine continuously pulls transactions form the transaction pool, uses them to
// create blocks, and mines those blocks. When a block is mined it is added
// to the blockchain and broadcasted into the network. Mine() returns when
// miner.StopMining() is called.
func (a *App) Mine() {
log.Info("Starting miner")
for {
// Make a new block form the transactions in the transaction pool
blockToMine := a.Pool.NextBlock(a.Chain, a.CurrentUser.Wallet.Public(),
a.CurrentUser.BlockSize)
miningResult := miner.Mine(a.Chain, blockToMine)
if miningResult.Complete {
log.Info("Sucessfully mined a block!")
push := msg.Push{
ResourceType: msg.ResourceBlock,
Resource: blockToMine,
}
a.PeerStore.Broadcast(push)
} else if miningResult.Info == miner.MiningHalted {
log.Info("Miner stopped.")
return
}
}
}

// SyncBlockChain updates the local copy of the blockchain by requesting missing
// blocks from peers. Returns error if we are not connected to any peers.
func (a *App) SyncBlockChain() error {
var currentHeadHash blockchain.Hash
prevHead := a.Chain.LastBlock()
newBlockChan := make(chan *blockchain.Block)
errChan := make(chan *msg.ProtocolError)

// Define a handler for responses to our block requests
blockResponseHandler := func(blockResponse *msg.Response) {
if blockResponse.Error != nil {
errChan <- blockResponse.Error
return
}

block, ok := blockResponse.Resource.(blockchain.Block)
if !ok {
newBlockChan <- nil
return
}

newBlockChan <- &block
}

// Continually request the block after the latest block in our chain until
// we are totally up to date
for {
currentHead := a.Chain.LastBlock()
if currentHead == nil {
// Our blockchain is empty
currentHeadHash = blockchain.NilHash
} else {
currentHeadHash = blockchain.HashSum(currentHead)
}

reqParams := map[string]interface{}{
"lastBlockHash": currentHeadHash,
}

blockRequest := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourceBlock,
Params: reqParams,
}

// Pick a peer to send the request to
p := a.PeerStore.GetRandom()
if p == nil {
return errors.New("SyncBlockchain failed: no peers to request blocks from")
}
p.Request(blockRequest, blockResponseHandler)

// Wait for response
select {
case newBlock := <-newBlockChan:
if newBlock == nil {
// We received a response with no error but an invalid resource
// Try again
continue
}

valid, validationCode := consensus.VerifyBlock(a.Chain, newBlock)
if !valid {
// There is something wrong with this block. Try again
fields := log.Fields{"validationCode": validationCode}
log.WithFields(fields).Debug(
"SyncBlockchain received invalid block")
continue
}

// Valid block. Append it to the chain
a.Chain.AppendBlock(newBlock)

if (&newBlock.BlockHeader).Equal(&prevHead.BlockHeader) {
// Our blockchain is up to date
return nil
}

case err := <-errChan:
if err.Code == msg.ResourceNotFound {
// Our chain might be out of sync, roll it back by one block
// and request the next block
prevHead = a.Chain.RollBack()
}

// Some other protocol error occurred. Try again
log.WithError(err).Debug("SyncBlockChain received error response")
}
}
}
33 changes: 26 additions & 7 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"testing"
"time"

"github.com/ubclaunchpad/cumulus/miner"

log "github.com/Sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/ubclaunchpad/cumulus/blockchain"
Expand All @@ -24,7 +26,7 @@ func TestPushHandlerNewBlock(t *testing.T) {
}
a.PushHandler(&push)
select {
case blk, ok := <-blockQueue:
case blk, ok := <-a.blockQueue:
assert.True(t, ok)
assert.Equal(t, blk, b)
}
Expand All @@ -40,7 +42,7 @@ func TestPushHandlerNewTestTransaction(t *testing.T) {
}
a.PushHandler(&push)
select {
case tr, ok := <-transactionQueue:
case tr, ok := <-a.transactionQueue:
assert.True(t, ok)
assert.Equal(t, tr, txn)
}
Expand Down Expand Up @@ -160,21 +162,38 @@ func TestGetLocalPool(t *testing.T) {
}

func TestGetLocalChain(t *testing.T) {
assert.NotNil(t, getLocalChain())
assert.NotNil(t, getLocalChain(NewUser()))
}

func TestHandleBlock(t *testing.T) {
a := createNewTestApp()
go a.HandleWork()
time.Sleep(50 * time.Millisecond)
blockQueue <- blockchain.NewTestBlock()
assert.Equal(t, len(blockQueue), 0)
a.blockQueue <- blockchain.NewTestBlock()
assert.Equal(t, len(a.blockQueue), 0)
}

func TestHandleTransaction(t *testing.T) {
a := createNewTestApp()
go a.HandleWork()
time.Sleep(50 * time.Millisecond)
transactionQueue <- blockchain.NewTestTransaction()
assert.Equal(t, len(transactionQueue), 0)
a.transactionQueue <- blockchain.NewTestTransaction()
assert.Equal(t, len(a.transactionQueue), 0)
}

func TestMine(t *testing.T) {
a := createNewTestApp()
if miner.IsMining() {
t.FailNow()
}
go a.Mine()
time.Sleep(time.Second)
if !miner.IsMining() {
t.FailNow()
}
miner.StopMining()
time.Sleep(time.Second)
if miner.IsMining() {
t.FailNow()
}
}
Loading

0 comments on commit 39a61de

Please sign in to comment.