Skip to content

Commit

Permalink
Merge a660454 into b6d5e37
Browse files Browse the repository at this point in the history
  • Loading branch information
bfbachmann committed Aug 12, 2017
2 parents b6d5e37 + a660454 commit 5a48fb2
Show file tree
Hide file tree
Showing 22 changed files with 1,092 additions and 350 deletions.
277 changes: 230 additions & 47 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package app

import (
"encoding/json"
"errors"
"fmt"
"os"
"os/signal"
Expand All @@ -13,17 +15,15 @@ import (
"github.com/ubclaunchpad/cumulus/blockchain"
"github.com/ubclaunchpad/cumulus/conf"
"github.com/ubclaunchpad/cumulus/conn"
"github.com/ubclaunchpad/cumulus/consensus"
"github.com/ubclaunchpad/cumulus/miner"
"github.com/ubclaunchpad/cumulus/msg"
"github.com/ubclaunchpad/cumulus/peer"
"github.com/ubclaunchpad/cumulus/pool"
)

var (
logFile = os.Stdout
blockQueue = make(chan *blockchain.Block, blockQueueSize)
transactionQueue = make(chan *blockchain.Transaction, transactionQueueSize)
quitChan = make(chan bool)
logFile = os.Stdout
)

const (
Expand All @@ -33,10 +33,13 @@ const (

// App contains information about a running instance of a Cumulus node
type App struct {
CurrentUser *User
PeerStore *peer.PeerStore
Chain *blockchain.BlockChain
Pool *pool.Pool
CurrentUser *User
PeerStore *peer.PeerStore
Chain *blockchain.BlockChain
Pool *pool.Pool
blockQueue chan *blockchain.Block
transactionQueue chan *blockchain.Transaction
quitChan chan bool
}

// Run sets up and starts a new Cumulus node with the
Expand All @@ -46,11 +49,15 @@ func Run(cfg conf.Config) {
config := &cfg

addr := fmt.Sprintf("%s:%d", config.Interface, config.Port)
user := getCurrentUser()
a := App{
PeerStore: peer.NewPeerStore(addr),
CurrentUser: getCurrentUser(),
Chain: getLocalChain(),
Pool: getLocalPool(),
PeerStore: peer.NewPeerStore(addr),
CurrentUser: user,
Chain: getLocalChain(user),
Pool: getLocalPool(),
blockQueue: make(chan *blockchain.Block, blockQueueSize),
transactionQueue: make(chan *blockchain.Transaction, transactionQueueSize),
quitChan: make(chan bool),
}

// Set logging level
Expand Down Expand Up @@ -80,6 +87,11 @@ func Run(cfg conf.Config) {
// stream in. Kick off a worker to handle requests and pushes.
go a.HandleWork()

if config.Mine {
// Start the miner
go a.Mine()
}

// Set Peer default Push and Request handlers. These functions will handle
// request and push messages from all peers we connect to unless overridden
// for specific peers by calls like p.SetRequestHandler(someHandler)
Expand Down Expand Up @@ -121,6 +133,14 @@ func Run(cfg conf.Config) {
if len(config.Target) > 0 {
// Connect to the target and discover its peers.
a.ConnectAndDiscover(cfg.Target)

// Download blockchain
log.Info("Syncronizing blockchain")
_, err := a.SyncBlockChain()
if err != nil {
log.WithError(err).Fatal("Failed to download blockchain")
}
log.Info("Blockchain synchronization complete")
}
}

Expand Down Expand Up @@ -149,29 +169,36 @@ func (a *App) RequestHandler(req *msg.Request) msg.Response {
"Invalid resource type")
notFoundErr := msg.NewProtocolError(msg.ResourceNotFound,
"Resource not found.")
badRequestErr := msg.NewProtocolError(msg.BadRequest,
"Bad request")

switch req.ResourceType {
case msg.ResourcePeerInfo:
res.Resource = a.PeerStore.Addrs()
case msg.ResourceBlock:
// Block is requested by number.
blockNumber, ok := req.Params["blockNumber"].(uint32)
if ok {
// If its ok, we make try to a copy of it.
// TODO: Make this CopyBlockByHash.
blk, err := a.Chain.CopyBlockByIndex(blockNumber)
if err != nil {
// Bad index parameter.
res.Error = notFoundErr
} else {
res.Resource = blk
}
} else {
// No index parameter.
log.Debug("Received block request")

// Block is requested by block hash.
hashBytes, err := json.Marshal(req.Params["lastBlockHash"])
if err != nil {
res.Error = badRequestErr
break
}

var hash blockchain.Hash
err = json.Unmarshal(hashBytes, &hash)
if err != nil {
res.Error = badRequestErr
break
}

block, err := a.Chain.GetBlockByLastBlockHash(hash)
if err != nil {
res.Error = notFoundErr
} else {
res.Resource = block
}
default:
// Return err by default.
res.Error = typeErr
}

Expand All @@ -186,15 +213,15 @@ func (a *App) PushHandler(push *msg.Push) {
blk, ok := push.Resource.(*blockchain.Block)
if ok {
log.Info("Adding block to work queue.")
blockQueue <- blk
a.blockQueue <- blk
} else {
log.Error("Could not cast resource to block.")
}
case msg.ResourceTransaction:
txn, ok := push.Resource.(*blockchain.Transaction)
if ok {
log.Info("Adding transaction to work queue.")
transactionQueue <- txn
a.transactionQueue <- txn
} else {
log.Error("Could not cast resource to transaction.")
}
Expand All @@ -204,11 +231,19 @@ func (a *App) PushHandler(push *msg.Push) {
}

// getLocalChain returns an instance of the blockchain.
func getLocalChain() *blockchain.BlockChain {
func getLocalChain(user *User) *blockchain.BlockChain {
// TODO: Look for local chain on disk. If doesn't exist, go rummaging
// around on the internets for one.
bc, _ := blockchain.NewValidTestChainAndBlock()
return bc
bc := blockchain.BlockChain{
Blocks: make([]*blockchain.Block, 0),
Head: blockchain.NilHash,
}

genisisBlock := blockchain.Genesis(user.Wallet.Public(),
consensus.CurrentTarget(), consensus.StartingBlockReward, []byte{})

bc.AppendBlock(genisisBlock)
return &bc
}

// getLocalPool returns an instance of the pool.
Expand All @@ -222,11 +257,11 @@ func (a *App) HandleWork() {
log.Debug("Worker waiting for work.")
for {
select {
case work := <-transactionQueue:
case work := <-a.transactionQueue:
a.HandleTransaction(work)
case work := <-blockQueue:
case work := <-a.blockQueue:
a.HandleBlock(work)
case <-quitChan:
case <-a.quitChan:
return
}
}
Expand All @@ -236,26 +271,174 @@ func (a *App) HandleWork() {
func (a *App) HandleTransaction(txn *blockchain.Transaction) {
validTransaction := a.Pool.Push(txn, a.Chain)
if validTransaction {
log.Debug("added transaction to pool from address: " + txn.Sender.Repr())
log.Debug("Added transaction to pool from address: " + txn.Sender.Repr())
} else {
log.Debug("bad transaction rejected from sender: " + txn.Sender.Repr())
log.Debug("Bad transaction rejected from sender: " + txn.Sender.Repr())
}
}

// HandleBlock handles new instance of BlockWork.
func (a *App) HandleBlock(blk *blockchain.Block) {
log.Info("Received new block")
validBlock := a.Pool.Update(blk, a.Chain)

if validBlock {
// Append to the chain before requesting
// the next block so that the block
// numbers make sense.
a.Chain.AppendBlock(blk)
address := a.CurrentUser.Wallet.Public()
blk := a.Pool.NextBlock(a.Chain, address, a.CurrentUser.BlockSize)
if miner.IsMining() {
miner.RestartMiner(a.Chain, blk)
if !validBlock {
// The block was invalid wrt our chain. Maybe our chain is out of date.
// Update it and try again.
chainChanged, err := a.SyncBlockChain()
if chainChanged && miner.IsMining() {
miner.StopMining()
go a.Mine()
}
if err != nil {
log.WithError(err).Error("Error synchronizing blockchain")
return
}

validBlock = a.Pool.Update(blk, a.Chain)
if !validBlock {
// Synchronizing our chain didn't help, the block is still invalid.
return
}
}

// Append to the chain before requesting the next block so that the block
// numbers make sense.
a.Chain.AppendBlock(blk)
if miner.IsMining() {
miner.StopMining()
go a.Mine()
}
log.Debug("Added block number %d to chain", blk.BlockNumber)
return
}

// Mine continuously pulls transactions form the transaction pool, uses them to
// create blocks, and mines those blocks. When a block is mined it is added
// to the blockchain and broadcasted into the network. Mine() returns when
// miner.StopMining() is called.
func (a *App) Mine() {
log.Info("Starting miner")
for {
// Make a new block form the transactions in the transaction pool
blockToMine := a.Pool.NextBlock(a.Chain, a.CurrentUser.Wallet.Public(),
a.CurrentUser.BlockSize)
miningResult := miner.Mine(a.Chain, blockToMine)
if miningResult.Complete {
log.Info("Sucessfully mined a block!")
push := msg.Push{
ResourceType: msg.ResourceBlock,
Resource: blockToMine,
}
a.PeerStore.Broadcast(push)
} else if miningResult.Info == miner.MiningHalted {
log.Info("Miner stopped.")
return
}
}
}

// SyncBlockChain updates the local copy of the blockchain by requesting missing
// blocks from peers. Returns true if the blockchain changed as a result of
// calling this function, false if it didn't and an error if we are not connected
// to any peers.
func (a *App) SyncBlockChain() (bool, error) {
var currentHeadHash blockchain.Hash
prevHead := a.Chain.LastBlock()
newBlockChan := make(chan *blockchain.Block)
errChan := make(chan *msg.ProtocolError)
changed := false

// Define a handler for responses to our block requests
blockResponseHandler := func(blockResponse *msg.Response) {
if blockResponse.Error != nil {
errChan <- blockResponse.Error
return
}

blockBytes, err := json.Marshal(blockResponse.Resource)
if err != nil {
newBlockChan <- nil
return
}

block, err := blockchain.DecodeBlockJSON(blockBytes)
if err != nil {
log.WithError(err).Error("Error decoding block")
newBlockChan <- nil
return
}

newBlockChan <- block
}

// Continually request the block after the latest block in our chain until
// we are totally up to date
for {
currentHead := a.Chain.LastBlock()
if currentHead == nil {
// Our blockchain is empty
currentHeadHash = blockchain.NilHash
} else {
currentHeadHash = blockchain.HashSum(currentHead)
}

reqParams := map[string]interface{}{
"lastBlockHash": currentHeadHash,
}

blockRequest := msg.Request{
ID: uuid.New().String(),
ResourceType: msg.ResourceBlock,
Params: reqParams,
}

// Pick a peer to send the request to
p := a.PeerStore.GetRandom()
if p == nil {
return changed, errors.New(
"SyncBlockchain failed: no peers to request blocks from")
}
p.Request(blockRequest, blockResponseHandler)

// Wait for response
select {
case newBlock := <-newBlockChan:
if newBlock == nil {
// We received a response with no error but an invalid resource
// Try again
log.Debug("Received block response with invalid resource")
continue
}

valid, validationCode := consensus.VerifyBlock(a.Chain, newBlock)
if !valid {
// There is something wrong with this block. Try again
fields := log.Fields{"validationCode": validationCode}
log.WithFields(fields).Debug("SyncBlockchain received invalid block")
continue
}

// Valid block. Append it to the chain
log.Debug("Adding block to blockchain")
a.Chain.AppendBlock(newBlock)
changed = true

if (&newBlock.BlockHeader).Equal(&prevHead.BlockHeader) {
// Our blockchain is up to date
return changed, nil
}

case err := <-errChan:
if err.Code == msg.ResourceNotFound {
// Our chain might be out of sync, roll it back by one block
// and request the next block
prevHead = a.Chain.RollBack()
changed = true
}

// Some other protocol error occurred. Try again
log.WithError(err).Debug("SyncBlockChain received error response")
}
log.Debug("added blk number %d to chain", blk.BlockNumber)
}
}
Loading

0 comments on commit 5a48fb2

Please sign in to comment.