From 376ed69c53401814324560c9b8a7f4801bc98b19 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Sun, 23 Jul 2017 19:20:45 -0700 Subject: [PATCH 01/11] initial refactor --- app/app.go | 46 +++++---------------- app/console.go | 32 +++++++++------ app/user.go | 16 +------- app/work.go | 65 ++++++++++++++++++++++++++++-- app/worker.go | 98 --------------------------------------------- app/worker_test.go | 5 ++- blockchain/block.go | 3 ++ 7 files changed, 100 insertions(+), 165 deletions(-) delete mode 100644 app/worker.go diff --git a/app/app.go b/app/app.go index 6c3d726..7856883 100644 --- a/app/app.go +++ b/app/app.go @@ -27,7 +27,8 @@ var ( // App contains information about a running instance of a Cumulus node type App struct { - PeerStore *peer.PeerStore + PeerStore *peer.PeerStore + CurrentUser *User } // Run sets up and starts a new Cumulus node with the @@ -38,7 +39,8 @@ func Run(cfg conf.Config) { addr := fmt.Sprintf("%s:%d", config.Interface, config.Port) a := App{ - PeerStore: peer.NewPeerStore(addr), + PeerStore: peer.NewPeerStore(addr), + CurrentUser: NewUser(), } // Set logging level @@ -65,9 +67,9 @@ func Run(cfg conf.Config) { }() // Below we'll connect to peers. After which, requests could begin to - // stream in. We should first initalize our pool, workers to handle + // stream in. We should first initalize our pool, worker to handle // incoming messages. - initializeNode() + initializeNode(&a) // Set Peer default Push and Request handlers. These functions will handle // request and push messages from all peers we connect to unless overridden @@ -110,7 +112,7 @@ func Run(cfg conf.Config) { } log.Info("Redirecting logs to logfile") log.SetOutput(logFile) - go RunConsole(a.PeerStore) + go RunConsole(a.PeerStore, &a) } if len(config.Target) > 0 { @@ -180,7 +182,7 @@ func (a *App) PushHandler(push *msg.Push) { blk, ok := push.Resource.(*blockchain.Block) if ok { log.Info("Adding block to work queue.") - BlockWorkQueue <- BlockWork{blk, nil} + blockWorkQueue <- BlockWork{blk, nil} } else { log.Error("Could not cast resource to block.") } @@ -188,7 +190,7 @@ func (a *App) PushHandler(push *msg.Push) { txn, ok := push.Resource.(*blockchain.Transaction) if ok { log.Info("Adding transaction to work queue.") - TransactionWorkQueue <- TransactionWork{txn, nil} + transactionWorkQueue <- TransactionWork{txn, nil} } else { log.Error("Could not cast resource to transaction.") } @@ -199,27 +201,9 @@ func (a *App) PushHandler(push *msg.Push) { // initializeNode creates a transaction pool, workers and queues to handle // incoming messages. -func initializeNode() { +func initializeNode(app *App) { tpool = pool.New() - intializeQueues() - initializeWorkers() -} - -// intializeQueues makes all necessary queues. -func intializeQueues() { - BlockWorkQueue = make(chan BlockWork, BlockQueueSize) - TransactionWorkQueue = make(chan TransactionWork, TransactionQueueSize) - QuitChan = make(chan int) -} - -// initializeWorkers kicks off workers to handle incoming requests. -func initializeWorkers() { - for i := 0; i < nWorkers; i++ { - log.WithFields(log.Fields{"id": i}).Debug("Starting worker. ") - worker := NewWorker(i) - worker.Start() - workers[i] = &worker - } + go HandleWork(app) } // initializeChain creates the blockchain for the node. @@ -228,11 +212,3 @@ func initializeChain() { // TODO: Check if chain exists on disk. // TODO: If not, request chain from peers. } - -// killWorkers kills all workers. -func killWorkers() { - for i := 0; i < nWorkers; i++ { - QuitChan <- i - workers[i] = nil - } -} diff --git a/app/console.go b/app/console.go index f92b669..dcbef47 100644 --- a/app/console.go +++ b/app/console.go @@ -18,14 +18,16 @@ var ( // goroutine, and logging should be redirected away from stdout before it is run. // It takes a pointer to a PeerStore so we can use the PeerStore to interact // with other peers and give the user info about the running instance. -func RunConsole(ps *peer.PeerStore) { +func RunConsole(ps *peer.PeerStore, app *App) { peerStore = ps shell = ishell.New() shell.AddCmd(&ishell.Cmd{ Name: "create", Help: "create a new wallet hash or transaction", - Func: create, + Func: func(ctx *ishell.Context) { + create(ctx, app) + }, }) shell.AddCmd(&ishell.Cmd{ Name: "check", @@ -35,30 +37,36 @@ func RunConsole(ps *peer.PeerStore) { shell.AddCmd(&ishell.Cmd{ Name: "address", Help: "show the address this host is listening on", - Func: listenAddr, + Func: func(ctx *ishell.Context) { + listenAddr(ctx, peerStore) + }, }) shell.AddCmd(&ishell.Cmd{ Name: "peers", Help: "show the peers this host is connected to", - Func: peers, + Func: func(ctx *ishell.Context) { + peers(ctx, peerStore) + }, }) shell.AddCmd(&ishell.Cmd{ Name: "connect", Help: "connect to another peer", - Func: connect, + Func: func(ctx *ishell.Context) { + connect(ctx, peerStore) + }, }) shell.Start() emoji.Println(":cloud: Welcome to the :sunny: Cumulus console :cloud:") } -func create(ctx *ishell.Context) { +func create(ctx *ishell.Context, app *App) { choice := ctx.MultiChoice([]string{ "Wallet", "Transaction", }, "What would you like to create?") if choice == 0 { - createHotWallet(ctx) + createHotWallet(ctx, app) } else { shell.Print("Sender wallet ID: ") senderID := shell.ReadLine() @@ -89,15 +97,15 @@ func check(ctx *ishell.Context) { } } -func listenAddr(ctx *ishell.Context) { +func listenAddr(ctx *ishell.Context, peerStore *peer.PeerStore) { shell.Println("Listening on", peerStore.ListenAddr) } -func peers(tcx *ishell.Context) { +func peers(tcx *ishell.Context, peerStore *peer.PeerStore) { shell.Println("Connected to", peerStore.Addrs()) } -func connect(ctx *ishell.Context) { +func connect(ctx *ishell.Context, peerStore *peer.PeerStore) { if len(ctx.Args) == 0 { shell.Println("Usage: connect [IP address]:[TCP port]") return @@ -112,11 +120,11 @@ func connect(ctx *ishell.Context) { } } -func createHotWallet(ctx *ishell.Context) { +func createHotWallet(ctx *ishell.Context, app *App) { shell.Print("Enter wallet name: ") walletName := shell.ReadLine() wallet := HotWallet{walletName, blockchain.NewWallet()} - currentUser.HotWallet = wallet + app.CurrentUser.HotWallet = wallet emoji.Println(":credit_card: New hot wallet created!") emoji.Println(":raising_hand: Name: " + wallet.Name) emoji.Println(":mailbox: Address: " + wallet.Wallet.Public().Repr()) diff --git a/app/user.go b/app/user.go index 8d00f7f..9c078a1 100644 --- a/app/user.go +++ b/app/user.go @@ -14,15 +14,6 @@ type HotWallet struct { blockchain.Wallet } -var currentUser *User - -const defaultBlockSize = 1 << 18 - -func init() { - // Temporary to create a new user for testing. - currentUser = NewUser() -} - // NewUser creates a new user func NewUser() *User { return &User{ @@ -30,11 +21,6 @@ func NewUser() *User { Wallet: blockchain.NewWallet(), Name: "default", }, - BlockSize: defaultBlockSize, + BlockSize: blockchain.DefaultBlockSize, } } - -// GetCurrentUser returns the current user. -func GetCurrentUser() *User { - return currentUser -} diff --git a/app/work.go b/app/work.go index fd8731a..2afb8e6 100644 --- a/app/work.go +++ b/app/work.go @@ -1,6 +1,11 @@ package app -import "github.com/ubclaunchpad/cumulus/blockchain" +import ( + "github.com/ubclaunchpad/cumulus/blockchain" + "github.com/ubclaunchpad/cumulus/miner" + + log "github.com/Sirupsen/logrus" +) const ( // BlockQueueSize is the size of the BlockQueue channel. @@ -17,10 +22,13 @@ type Responder interface { } // BlockWorkQueue is a queue of blocks to process. -var BlockWorkQueue = make(chan BlockWork, BlockQueueSize) +var blockWorkQueue = make(chan BlockWork, BlockQueueSize) // TransactionWorkQueue is a queue of transactions to process. -var TransactionWorkQueue = make(chan TransactionWork, TransactionQueueSize) +var transactionWorkQueue = make(chan TransactionWork, TransactionQueueSize) + +// QuitChan kills the app worker. +var quitChan = make(chan bool) // TransactionWork holds a new transaction job, and a Responder for // sending results. @@ -35,3 +43,54 @@ type BlockWork struct { *blockchain.Block Responder } + +// HandleWork continually collects new work from existing work channels. +func HandleWork(app *App) { + log.Debug("Worker waiting for work.") + for { + select { + case work := <-transactionWorkQueue: + HandleTransaction(app, work) + case work := <-blockWorkQueue: + HandleBlock(app, work) + case <-quitChan: + return + } + } +} + +// HandleTransaction handles new instance of TransactionWork. +func HandleTransaction(app *App, work TransactionWork) { + validTransaction := tpool.Set(work.Transaction, chain) + + // Respond to the request if a response method was provided. + if work.Responder != nil { + work.Responder.Lock() + defer work.Responder.Unlock() + work.Responder.Send(validTransaction) + } +} + +// HandleBlock handles new instance of BlockWork. +func HandleBlock(app *App, work BlockWork) { + validBlock := tpool.Update(work.Block, chain) + + if validBlock { + // Append to the chain before requesting + // the next block so that the block + // numbers make sense. + chain.AppendBlock(work.Block) + address := app.CurrentUser.Wallet.Public() + blk := tpool.NextBlock(chain, address, app.CurrentUser.BlockSize) + if miner.IsMining() { + miner.RestartMiner(chain, blk) + } + } + + // Respond to the request if a response method was provided. + if work.Responder != nil { + work.Responder.Lock() + defer work.Responder.Unlock() + work.Responder.Send(validBlock) + } +} diff --git a/app/worker.go b/app/worker.go deleted file mode 100644 index 2e4bbcf..0000000 --- a/app/worker.go +++ /dev/null @@ -1,98 +0,0 @@ -package app - -import ( - log "github.com/Sirupsen/logrus" - "github.com/ubclaunchpad/cumulus/miner" -) - -// nWorkers is how many workers this node has. -const nWorkers = 10 - -// Workers is a list of workers. -var workers [nWorkers]*AppWorker - -// WorkerQueue is the channel of workers handling work. -var WorkerQueue chan AppWorker - -// QuitChan is the channel we use to kill the workers. -var QuitChan chan int - -// Worker is an interface for the basic app worker tasks. -type Worker interface { - HandleTransaction(work TransactionWork) - HandleBlock(work BlockWork) -} - -// AppWorker implements the basic worker. -type AppWorker struct { - ID int - log.FieldLogger -} - -// NewWorker returns a new AppWorker object. -func NewWorker(id int) AppWorker { - return AppWorker{ - ID: id, - FieldLogger: log.WithField("id", id), - } -} - -// Start continually collects new work from existing work channels. -func (w AppWorker) Start() { - go func() { - for { - // Wait for work. - log.WithFields(log.Fields{ - "id": w.ID, - }).Debug("Worker waiting for work") - select { - case work := <-TransactionWorkQueue: - w.FieldLogger.Debug("Worker handling new transaction work") - w.HandleTransaction(work) - case work := <-BlockWorkQueue: - w.FieldLogger.Debug("Worker handling new block work") - w.HandleBlock(work) - case <-QuitChan: - w.FieldLogger.Debug("Worker quitting") - return - } - } - }() -} - -// HandleTransaction handles new instance of TransactionWork. -func (w *AppWorker) HandleTransaction(work TransactionWork) { - validTransaction := tpool.Set(work.Transaction, chain) - - // Respond to the request if a response method was provided. - if work.Responder != nil { - work.Responder.Lock() - defer work.Responder.Unlock() - work.Responder.Send(validTransaction) - } -} - -// HandleBlock handles new instance of BlockWork. -func (w *AppWorker) HandleBlock(work BlockWork) { - validBlock := tpool.Update(work.Block, chain) - - if validBlock { - user := GetCurrentUser() - // Append to the chain before requesting - // the next block so that the block - // numbers make sense. - chain.AppendBlock(work.Block) - address := user.Wallet.Public() - blk := tpool.NextBlock(chain, address, user.BlockSize) - if miner.IsMining() { - miner.RestartMiner(chain, blk) - } - } - - // Respond to the request if a response method was provided. - if work.Responder != nil { - work.Responder.Lock() - defer work.Responder.Unlock() - work.Responder.Send(validBlock) - } -} diff --git a/app/worker_test.go b/app/worker_test.go index 7d03747..2b01087 100644 --- a/app/worker_test.go +++ b/app/worker_test.go @@ -51,7 +51,8 @@ func reset() { tpool = pool.New() chain, legitBlock = bc.NewValidTestChainAndBlock() legitTransaction = legitBlock.Transactions[1] - realWorker = NewWorker(7) + a := App{nil, NewUser()} + realWorker = NewWorker(0, a) mockResponder = MockResponder{ Mutex: &sync.Mutex{}, Result: false, @@ -77,7 +78,7 @@ func reset() { func TestNewWorker(t *testing.T) { reset() - if realWorker.ID != 7 { + if realWorker.ID != 0 { t.FailNow() } } diff --git a/blockchain/block.go b/blockchain/block.go index 4d7a9d8..3d8bfd4 100644 --- a/blockchain/block.go +++ b/blockchain/block.go @@ -9,6 +9,9 @@ import ( "github.com/ubclaunchpad/cumulus/common/util" ) +// DefaultBlockSize is the default block size, can be augmented by the user. +const DefaultBlockSize = 1 << 18 + // BlockHeader contains metadata about a block type BlockHeader struct { // BlockNumber is the position of the block within the blockchain From c0bf602723e4962cb63dfe8da2c9133a74f02b58 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Sun, 23 Jul 2017 20:42:59 -0700 Subject: [PATCH 02/11] worker :gun: --- app/app.go | 44 +++++------ app/app_test.go | 111 ++++++++------------------- app/user.go | 7 ++ app/work.go | 18 ++--- app/{worker_test.go => work_test.go} | 9 +-- 5 files changed, 67 insertions(+), 122 deletions(-) rename app/{worker_test.go => work_test.go} (96%) diff --git a/app/app.go b/app/app.go index 7856883..ce5d1bc 100644 --- a/app/app.go +++ b/app/app.go @@ -19,16 +19,15 @@ import ( ) var ( - chain *blockchain.BlockChain logFile = os.Stdout - // A reference to the transaction pool - tpool *pool.Pool ) // App contains information about a running instance of a Cumulus node type App struct { - PeerStore *peer.PeerStore CurrentUser *User + PeerStore *peer.PeerStore + Chain *blockchain.BlockChain + Pool *pool.Pool } // Run sets up and starts a new Cumulus node with the @@ -40,7 +39,9 @@ func Run(cfg conf.Config) { addr := fmt.Sprintf("%s:%d", config.Interface, config.Port) a := App{ PeerStore: peer.NewPeerStore(addr), - CurrentUser: NewUser(), + CurrentUser: getCurrentUser(), + Chain: getLocalChain(), + Pool: getLocalPool(), } // Set logging level @@ -67,9 +68,8 @@ func Run(cfg conf.Config) { }() // Below we'll connect to peers. After which, requests could begin to - // stream in. We should first initalize our pool, worker to handle - // incoming messages. - initializeNode(&a) + // stream in. Kick off a worker to handle requests and pushes. + go HandleWork(&a) // Set Peer default Push and Request handlers. These functions will handle // request and push messages from all peers we connect to unless overridden @@ -92,12 +92,6 @@ func Run(cfg conf.Config) { // peer is running go a.PeerStore.MaintainConnections(wg) - // Request the blockchain. - if chain == nil { - log.Info("Request blockchain from peers not yet implemented.") - initializeChain() - } - // Wait for goroutines to start wg.Wait() @@ -155,7 +149,7 @@ func (a *App) RequestHandler(req *msg.Request) msg.Response { blockNumber, ok := req.Params["blockNumber"].(uint32) if ok { // If its ok, we make try to a copy of it. - blk, err := chain.CopyBlockByIndex(blockNumber) + blk, err := a.Chain.CopyBlockByIndex(blockNumber) if err != nil { // Bad index parameter. res.Error = notFoundErr @@ -199,16 +193,16 @@ func (a *App) PushHandler(push *msg.Push) { } } -// initializeNode creates a transaction pool, workers and queues to handle -// incoming messages. -func initializeNode(app *App) { - tpool = pool.New() - go HandleWork(app) +// getLocalChain returns an instance of the blockchain. +func getLocalChain() *blockchain.BlockChain { + // TODO: Look for local chain on disk. If doesn't exist, go rummaging + // around on the internets for one. + bc, _ := blockchain.NewValidTestChainAndBlock() + return bc } -// initializeChain creates the blockchain for the node. -func initializeChain() { - chain, _ = blockchain.NewValidTestChainAndBlock() - // TODO: Check if chain exists on disk. - // TODO: If not, request chain from peers. +// getLocalPool returns an instance of the pool. +func getLocalPool() *pool.Pool { + // TODO: Look for local pool on disk. If doesn't exist, make a new one. + return pool.New() } diff --git a/app/app_test.go b/app/app_test.go index 6e1083b..bc22994 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -2,148 +2,100 @@ package app import ( "testing" - "time" log "github.com/Sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/msg" "github.com/ubclaunchpad/cumulus/peer" + "github.com/ubclaunchpad/cumulus/pool" ) func init() { log.SetLevel(log.InfoLevel) } -func createNewBlockRequest(blockNumber interface{}) *msg.Request { +func createNewBlockRequest(lastBlock interface{}) *msg.Request { params := make(map[string]interface{}, 1) - params["blockNumber"] = blockNumber + params["lastBlock"] = lastBlock return &msg.Request{ ResourceType: msg.ResourceBlock, Params: params, } } -func TestKillWorkers(t *testing.T) { - intializeQueues() - time.Sleep(20 * time.Millisecond) - for i := 0; i < nWorkers; i++ { - if workers[i] != nil { - t.FailNow() - } +func createNewApp() *App { + chain, _ := blockchain.NewValidTestChainAndBlock() + return &App{ + PeerStore: peer.NewPeerStore("127.0.0.1:8000"), + CurrentUser: NewUser(), + Chain: chain, + Pool: pool.New(), } - initializeWorkers() - time.Sleep(20 * time.Millisecond) - for i := 0; i < nWorkers; i++ { - if workers[i] == nil { - t.FailNow() - } - } - killWorkers() - time.Sleep(20 * time.Millisecond) - for i := 0; i < nWorkers; i++ { - if workers[i] != nil { - t.FailNow() - } - } -} - -func TestInitializeNode(t *testing.T) { - initializeNode() - if tpool == nil { - t.FailNow() - } - if BlockWorkQueue == nil { - t.FailNow() - } - if TransactionWorkQueue == nil { - t.FailNow() - } - killWorkers() } func TestPushHandlerNewBlock(t *testing.T) { - intializeQueues() + // Should put a block in the blockWorkQueue. + a := createNewApp() _, b := blockchain.NewValidTestChainAndBlock() push := msg.Push{ ResourceType: msg.ResourceBlock, Resource: b, } - a := App{ - PeerStore: peer.NewPeerStore("127.0.0.1:8000"), - } a.PushHandler(&push) select { - case work, ok := <-BlockWorkQueue: - if !ok { - t.FailNow() - } - if work.Block != b { - t.FailNow() - } + case work, ok := <-blockWorkQueue: + assert.True(t, ok) + assert.Equal(t, work.Block, b) } - // Add more here... } func TestPushHandlerNewTestTransaction(t *testing.T) { - intializeQueues() + // Should put a transaction in the transactionWorkQueue. + a := createNewApp() txn := blockchain.NewTestTransaction() push := msg.Push{ ResourceType: msg.ResourceTransaction, Resource: txn, } - a := App{ - PeerStore: peer.NewPeerStore("127.0.0.1:8000"), - } a.PushHandler(&push) select { - case work, ok := <-TransactionWorkQueue: - if !ok { - t.FailNow() - } - if work.Transaction != txn { - t.FailNow() - } + case work, ok := <-transactionWorkQueue: + assert.True(t, ok) + assert.Equal(t, work.Transaction, txn) } - // Add more here... } func TestRequestHandlerNewBlockOK(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} - - // Set up a request (requesting block 0) - blockNumber := uint32(0) - req := createNewBlockRequest(blockNumber) + // Request a new block by hash and verify we get the right one. + a := createNewApp() + req := createNewBlockRequest(a.Chain.Blocks[1].LastBlock) resp := a.RequestHandler(req) block, ok := resp.Resource.(*blockchain.Block) // Assertion time! assert.True(t, ok, "resource should contain block") - assert.Equal(t, block.BlockNumber, blockNumber, - "block number should be "+string(blockNumber)) + assert.Equal(t, block, a.Chain.Blocks[1]) } func TestRequestHandlerNewBlockBadParams(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} + a := createNewApp() // Set up a request. - blockNumber := "definitelynotanindex" - req := createNewBlockRequest(blockNumber) + hash := "definitelynotahash" + req := createNewBlockRequest(hash) resp := a.RequestHandler(req) block, ok := resp.Resource.(*blockchain.Block) // Make sure request failed. assert.False(t, ok, "resource should not contain block") - assert.Nil(t, block, "resource should not contain block") + assert.Equal(t, resp.Error.Code, msg.ResourceNotFound, resp.Error.Message) } func TestRequestHandlerNewBlockBadType(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} + a := createNewApp() // Set up a request. req := createNewBlockRequest("doesntmatter") @@ -154,12 +106,11 @@ func TestRequestHandlerNewBlockBadType(t *testing.T) { // Make sure request failed. assert.False(t, ok, "resource should not contain block") - assert.Nil(t, block, "resource should not contain block") + assert.Equal(t, resp.Error.Code, msg.InvalidResourceType, resp.Error.Message) } func TestRequestHandlerPeerInfo(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} + a := createNewApp() // Set up a request. req := createNewBlockRequest("doesntmatter") diff --git a/app/user.go b/app/user.go index 9c078a1..8f9a490 100644 --- a/app/user.go +++ b/app/user.go @@ -24,3 +24,10 @@ func NewUser() *User { BlockSize: blockchain.DefaultBlockSize, } } + +// getCurrentUser gets the current user. +func getCurrentUser() *User { + // TODO: Check for local user information on disk, + // If doesnt exist, create new user. + return NewUser() +} diff --git a/app/work.go b/app/work.go index 2afb8e6..a7c4a86 100644 --- a/app/work.go +++ b/app/work.go @@ -9,9 +9,9 @@ import ( const ( // BlockQueueSize is the size of the BlockQueue channel. - BlockQueueSize = 100 + blockQueueSize = 100 // TransactionQueueSize is the size of the BlockQueue channel. - TransactionQueueSize = 100 + transactionQueueSize = 100 ) // Responder is used to handle requests who require a response. @@ -22,10 +22,10 @@ type Responder interface { } // BlockWorkQueue is a queue of blocks to process. -var blockWorkQueue = make(chan BlockWork, BlockQueueSize) +var blockWorkQueue = make(chan BlockWork, blockQueueSize) // TransactionWorkQueue is a queue of transactions to process. -var transactionWorkQueue = make(chan TransactionWork, TransactionQueueSize) +var transactionWorkQueue = make(chan TransactionWork, transactionQueueSize) // QuitChan kills the app worker. var quitChan = make(chan bool) @@ -61,7 +61,7 @@ func HandleWork(app *App) { // HandleTransaction handles new instance of TransactionWork. func HandleTransaction(app *App, work TransactionWork) { - validTransaction := tpool.Set(work.Transaction, chain) + validTransaction := app.Pool.Set(work.Transaction, app.Chain) // Respond to the request if a response method was provided. if work.Responder != nil { @@ -73,17 +73,17 @@ func HandleTransaction(app *App, work TransactionWork) { // HandleBlock handles new instance of BlockWork. func HandleBlock(app *App, work BlockWork) { - validBlock := tpool.Update(work.Block, chain) + validBlock := app.Pool.Update(work.Block, app.Chain) if validBlock { // Append to the chain before requesting // the next block so that the block // numbers make sense. - chain.AppendBlock(work.Block) + app.Chain.AppendBlock(work.Block) address := app.CurrentUser.Wallet.Public() - blk := tpool.NextBlock(chain, address, app.CurrentUser.BlockSize) + blk := app.Pool.NextBlock(app.Chain, address, app.CurrentUser.BlockSize) if miner.IsMining() { - miner.RestartMiner(chain, blk) + miner.RestartMiner(app.Chain, blk) } } diff --git a/app/worker_test.go b/app/work_test.go similarity index 96% rename from app/worker_test.go rename to app/work_test.go index 2b01087..b07be16 100644 --- a/app/worker_test.go +++ b/app/work_test.go @@ -76,15 +76,8 @@ func reset() { QuitChan = make(chan int) } -func TestNewWorker(t *testing.T) { - reset() - if realWorker.ID != 0 { - t.FailNow() - } -} - func TestHandleTransactionOK(t *testing.T) { - reset() + a := createNewApp() realWorker.HandleTransaction(goodTxnWork) if mockResponder.Result != true { t.FailNow() From 1ec1f91ba89576afac5fde99d83bfbd4c858993c Mon Sep 17 00:00:00 2001 From: chadlagore Date: Mon, 24 Jul 2017 22:51:20 -0700 Subject: [PATCH 03/11] removed work --- app/app.go | 70 +++++++++++++++++++++- app/app_test.go | 108 ++++++++++++++++++++++++---------- app/test_utils.go | 1 + app/work.go | 96 ------------------------------ app/work_test.go | 145 ---------------------------------------------- 5 files changed, 147 insertions(+), 273 deletions(-) create mode 100644 app/test_utils.go delete mode 100644 app/work.go delete mode 100644 app/work_test.go diff --git a/app/app.go b/app/app.go index ce5d1bc..cb1bd6e 100644 --- a/app/app.go +++ b/app/app.go @@ -13,6 +13,7 @@ import ( "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/conf" "github.com/ubclaunchpad/cumulus/conn" + "github.com/ubclaunchpad/cumulus/miner" "github.com/ubclaunchpad/cumulus/msg" "github.com/ubclaunchpad/cumulus/peer" "github.com/ubclaunchpad/cumulus/pool" @@ -22,6 +23,13 @@ var ( logFile = os.Stdout ) +const ( + // BlockQueueSize is the size of the BlockQueue channel. + blockQueueSize = 100 + // TransactionQueueSize is the size of the BlockQueue channel. + transactionQueueSize = 100 +) + // App contains information about a running instance of a Cumulus node type App struct { CurrentUser *User @@ -30,6 +38,22 @@ type App struct { Pool *pool.Pool } +// Responder is used to handle requests who require a response. +type Responder interface { + Send(ok bool) + Lock() + Unlock() +} + +// BlockWorkQueue is a queue of blocks to process. +var blockQueue = make(chan *blockchain.Block, blockQueueSize) + +// TransactionWorkQueue is a queue of transactions to process. +var transactionQueue = make(chan *blockchain.Transaction, transactionQueueSize) + +// QuitChan kills the app worker. +var quitChan = make(chan bool) + // Run sets up and starts a new Cumulus node with the // given configuration. This should only be called once (except in tests) func Run(cfg conf.Config) { @@ -69,7 +93,7 @@ func Run(cfg conf.Config) { // Below we'll connect to peers. After which, requests could begin to // stream in. Kick off a worker to handle requests and pushes. - go HandleWork(&a) + go a.HandleWork() // Set Peer default Push and Request handlers. These functions will handle // request and push messages from all peers we connect to unless overridden @@ -176,7 +200,7 @@ func (a *App) PushHandler(push *msg.Push) { blk, ok := push.Resource.(*blockchain.Block) if ok { log.Info("Adding block to work queue.") - blockWorkQueue <- BlockWork{blk, nil} + blockQueue <- blk } else { log.Error("Could not cast resource to block.") } @@ -184,7 +208,7 @@ func (a *App) PushHandler(push *msg.Push) { txn, ok := push.Resource.(*blockchain.Transaction) if ok { log.Info("Adding transaction to work queue.") - transactionWorkQueue <- TransactionWork{txn, nil} + transactionQueue <- txn } else { log.Error("Could not cast resource to transaction.") } @@ -206,3 +230,43 @@ func getLocalPool() *pool.Pool { // TODO: Look for local pool on disk. If doesn't exist, make a new one. return pool.New() } + +// HandleWork continually collects new work from existing work channels. +func (a *App) HandleWork() { + log.Debug("Worker waiting for work.") + for { + select { + case work := <-transactionQueue: + a.HandleTransaction(work) + case work := <-blockQueue: + a.HandleBlock(work) + case <-quitChan: + return + } + } +} + +// HandleTransaction handles new instance of TransactionWork. +func (a *App) HandleTransaction(txn *blockchain.Transaction) { + validTransaction := a.Pool.Set(txn, a.Chain) + if !validTransaction { + log.Info("added transaction to pool from: " + txn.Sender) + } +} + +// HandleBlock handles new instance of BlockWork. +func (a *App) HandleBlock(blk *blockchain.Block) { + validBlock := a.Pool.Update(blk, a.Chain) + + if validBlock { + // Append to the chain before requesting + // the next block so that the block + // numbers make sense. + a.Chain.AppendBlock(blk) + address := a.CurrentUser.Wallet.Public() + blk := a.Pool.NextBlock(a.Chain, address, a.CurrentUser.BlockSize) + if miner.IsMining() { + miner.RestartMiner(a.Chain, blk) + } + } +} diff --git a/app/app_test.go b/app/app_test.go index bc22994..27d9c94 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -2,6 +2,7 @@ package app import ( "testing" + "time" log "github.com/Sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -15,28 +16,9 @@ func init() { log.SetLevel(log.InfoLevel) } -func createNewBlockRequest(lastBlock interface{}) *msg.Request { - params := make(map[string]interface{}, 1) - params["lastBlock"] = lastBlock - return &msg.Request{ - ResourceType: msg.ResourceBlock, - Params: params, - } -} - -func createNewApp() *App { - chain, _ := blockchain.NewValidTestChainAndBlock() - return &App{ - PeerStore: peer.NewPeerStore("127.0.0.1:8000"), - CurrentUser: NewUser(), - Chain: chain, - Pool: pool.New(), - } -} - func TestPushHandlerNewBlock(t *testing.T) { // Should put a block in the blockWorkQueue. - a := createNewApp() + a := createNewTestApp() _, b := blockchain.NewValidTestChainAndBlock() push := msg.Push{ ResourceType: msg.ResourceBlock, @@ -52,7 +34,7 @@ func TestPushHandlerNewBlock(t *testing.T) { func TestPushHandlerNewTestTransaction(t *testing.T) { // Should put a transaction in the transactionWorkQueue. - a := createNewApp() + a := createNewTestApp() txn := blockchain.NewTestTransaction() push := msg.Push{ ResourceType: msg.ResourceTransaction, @@ -68,9 +50,9 @@ func TestPushHandlerNewTestTransaction(t *testing.T) { func TestRequestHandlerNewBlockOK(t *testing.T) { // Request a new block by hash and verify we get the right one. - a := createNewApp() + a := createNewTestApp() - req := createNewBlockRequest(a.Chain.Blocks[1].LastBlock) + req := createNewTestBlockRequest(a.Chain.Blocks[1].LastBlock) resp := a.RequestHandler(req) block, ok := resp.Resource.(*blockchain.Block) @@ -80,11 +62,11 @@ func TestRequestHandlerNewBlockOK(t *testing.T) { } func TestRequestHandlerNewBlockBadParams(t *testing.T) { - a := createNewApp() + a := createNewTestApp() // Set up a request. hash := "definitelynotahash" - req := createNewBlockRequest(hash) + req := createNewTestBlockRequest(hash) resp := a.RequestHandler(req) block, ok := resp.Resource.(*blockchain.Block) @@ -95,10 +77,10 @@ func TestRequestHandlerNewBlockBadParams(t *testing.T) { } func TestRequestHandlerNewBlockBadType(t *testing.T) { - a := createNewApp() + a := createNewTestApp() // Set up a request. - req := createNewBlockRequest("doesntmatter") + req := createNewTestBlockRequest("doesntmatter") req.ResourceType = 25 resp := a.RequestHandler(req) @@ -110,10 +92,10 @@ func TestRequestHandlerNewBlockBadType(t *testing.T) { } func TestRequestHandlerPeerInfo(t *testing.T) { - a := createNewApp() + a := createNewTestApp() // Set up a request. - req := createNewBlockRequest("doesntmatter") + req := createNewTestBlockRequest("doesntmatter") req.ResourceType = msg.ResourcePeerInfo resp := a.RequestHandler(req) @@ -123,3 +105,71 @@ func TestRequestHandlerPeerInfo(t *testing.T) { assert.NotNil(t, res, "resource should contain peer info") // Assert peer address returned valid. } + +func TestHandleTransactionOK(t *testing.T) { + a := createNewTestApp() + HandleTransaction(goodTxnWork) + if mockResponder.Result != true { + t.FailNow() + } +} + +func TestHandleTransactionNotOK(t *testing.T) { + reset() + realWorker.HandleTransaction(badTxnWork) + if mockResponder.Result != false { + t.FailNow() + } +} + +func TestHandleBlockOK(t *testing.T) { + reset() + realWorker.HandleBlock(goodBlkWork) + if mockResponder.Result != true { + t.FailNow() + } +} + +func TestHandleBlockNotOK(t *testing.T) { + reset() + realWorker.HandleBlock(badBlkWork) + if mockResponder.Result != false { + t.FailNow() + } +} + +func TestStartTxn(t *testing.T) { + reset() + realWorker.Start() + TransactionWorkQueue <- goodTxnWork + time.Sleep(50 * time.Millisecond) + mockResponder.Lock() + if !mockResponder.Result { + t.FailNow() + } + mockResponder.Unlock() +} + +func TestStartBlk(t *testing.T) { + reset() + realWorker.Start() + BlockWorkQueue <- goodBlkWork + time.Sleep(50 * time.Millisecond) + mockResponder.Lock() + if !mockResponder.Result { + t.FailNow() + } + mockResponder.Unlock() +} + +func TestQuitWorker(t *testing.T) { + reset() + for i := 0; i < nWorkers; i++ { + NewWorker(i).Start() + } + + // Would hang if quit call fails, and travis would fail. + for i := 0; i < nWorkers; i++ { + QuitChan <- i + } +} diff --git a/app/test_utils.go b/app/test_utils.go new file mode 100644 index 0000000..4879f7a --- /dev/null +++ b/app/test_utils.go @@ -0,0 +1 @@ +package app diff --git a/app/work.go b/app/work.go deleted file mode 100644 index a7c4a86..0000000 --- a/app/work.go +++ /dev/null @@ -1,96 +0,0 @@ -package app - -import ( - "github.com/ubclaunchpad/cumulus/blockchain" - "github.com/ubclaunchpad/cumulus/miner" - - log "github.com/Sirupsen/logrus" -) - -const ( - // BlockQueueSize is the size of the BlockQueue channel. - blockQueueSize = 100 - // TransactionQueueSize is the size of the BlockQueue channel. - transactionQueueSize = 100 -) - -// Responder is used to handle requests who require a response. -type Responder interface { - Send(ok bool) - Lock() - Unlock() -} - -// BlockWorkQueue is a queue of blocks to process. -var blockWorkQueue = make(chan BlockWork, blockQueueSize) - -// TransactionWorkQueue is a queue of transactions to process. -var transactionWorkQueue = make(chan TransactionWork, transactionQueueSize) - -// QuitChan kills the app worker. -var quitChan = make(chan bool) - -// TransactionWork holds a new transaction job, and a Responder for -// sending results. -type TransactionWork struct { - *blockchain.Transaction - Responder -} - -// BlockWork holds a new block job, and a Responder for -// sending results. -type BlockWork struct { - *blockchain.Block - Responder -} - -// HandleWork continually collects new work from existing work channels. -func HandleWork(app *App) { - log.Debug("Worker waiting for work.") - for { - select { - case work := <-transactionWorkQueue: - HandleTransaction(app, work) - case work := <-blockWorkQueue: - HandleBlock(app, work) - case <-quitChan: - return - } - } -} - -// HandleTransaction handles new instance of TransactionWork. -func HandleTransaction(app *App, work TransactionWork) { - validTransaction := app.Pool.Set(work.Transaction, app.Chain) - - // Respond to the request if a response method was provided. - if work.Responder != nil { - work.Responder.Lock() - defer work.Responder.Unlock() - work.Responder.Send(validTransaction) - } -} - -// HandleBlock handles new instance of BlockWork. -func HandleBlock(app *App, work BlockWork) { - validBlock := app.Pool.Update(work.Block, app.Chain) - - if validBlock { - // Append to the chain before requesting - // the next block so that the block - // numbers make sense. - app.Chain.AppendBlock(work.Block) - address := app.CurrentUser.Wallet.Public() - blk := app.Pool.NextBlock(app.Chain, address, app.CurrentUser.BlockSize) - if miner.IsMining() { - miner.RestartMiner(app.Chain, blk) - } - } - - // Respond to the request if a response method was provided. - if work.Responder != nil { - work.Responder.Lock() - defer work.Responder.Unlock() - work.Responder.Send(validBlock) - } -} diff --git a/app/work_test.go b/app/work_test.go deleted file mode 100644 index b07be16..0000000 --- a/app/work_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package app - -import ( - "sync" - "testing" - "time" - - bc "github.com/ubclaunchpad/cumulus/blockchain" - "github.com/ubclaunchpad/cumulus/pool" -) - -var ( - legitBlock *bc.Block - legitTransaction *bc.Transaction - realWorker AppWorker - txnWork TransactionWork - mockResponder MockResponder - badTxnWork TransactionWork - goodTxnWork TransactionWork - badBlkWork BlockWork - goodBlkWork BlockWork - badTxn *bc.Transaction - badBlk *bc.Block -) - -type MockResponder struct { - Result bool - NumCalls int - *sync.Mutex -} - -func (r *MockResponder) Send(ok bool) { - r.Result = ok - r.NumCalls++ -} - -func (r *MockResponder) Lock() { - r.Mutex.Lock() -} - -func (r *MockResponder) Unlock() { - r.Mutex.Unlock() -} - -func init() { - badTxn = bc.NewTestTransaction() - badBlk = bc.NewTestBlock() -} - -func reset() { - tpool = pool.New() - chain, legitBlock = bc.NewValidTestChainAndBlock() - legitTransaction = legitBlock.Transactions[1] - a := App{nil, NewUser()} - realWorker = NewWorker(0, a) - mockResponder = MockResponder{ - Mutex: &sync.Mutex{}, - Result: false, - } - goodTxnWork = TransactionWork{ - Transaction: legitTransaction, - Responder: &mockResponder, - } - badTxnWork = TransactionWork{ - Transaction: badTxn, - Responder: &mockResponder, - } - goodBlkWork = BlockWork{ - Block: legitBlock, - Responder: &mockResponder, - } - badBlkWork = BlockWork{ - Block: badBlk, - Responder: &mockResponder, - } - QuitChan = make(chan int) -} - -func TestHandleTransactionOK(t *testing.T) { - a := createNewApp() - realWorker.HandleTransaction(goodTxnWork) - if mockResponder.Result != true { - t.FailNow() - } -} - -func TestHandleTransactionNotOK(t *testing.T) { - reset() - realWorker.HandleTransaction(badTxnWork) - if mockResponder.Result != false { - t.FailNow() - } -} - -func TestHandleBlockOK(t *testing.T) { - reset() - realWorker.HandleBlock(goodBlkWork) - if mockResponder.Result != true { - t.FailNow() - } -} - -func TestHandleBlockNotOK(t *testing.T) { - reset() - realWorker.HandleBlock(badBlkWork) - if mockResponder.Result != false { - t.FailNow() - } -} - -func TestStartTxn(t *testing.T) { - reset() - realWorker.Start() - TransactionWorkQueue <- goodTxnWork - time.Sleep(50 * time.Millisecond) - mockResponder.Lock() - if !mockResponder.Result { - t.FailNow() - } - mockResponder.Unlock() -} - -func TestStartBlk(t *testing.T) { - reset() - realWorker.Start() - BlockWorkQueue <- goodBlkWork - time.Sleep(50 * time.Millisecond) - mockResponder.Lock() - if !mockResponder.Result { - t.FailNow() - } - mockResponder.Unlock() -} - -func TestQuitWorker(t *testing.T) { - reset() - for i := 0; i < nWorkers; i++ { - NewWorker(i).Start() - } - - // Would hang if quit call fails, and travis would fail. - for i := 0; i < nWorkers; i++ { - QuitChan <- i - } -} From 1919c9932a5977e9b1165e77f36bebb7752f7300 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Mon, 24 Jul 2017 23:23:47 -0700 Subject: [PATCH 04/11] made test_utils for app; add empty function for pool --- app/app.go | 5 ++- app/app_test.go | 97 +++++++++++++++++++---------------------------- app/test_utils.go | 26 +++++++++++++ pool/pool.go | 5 +++ 4 files changed, 73 insertions(+), 60 deletions(-) diff --git a/app/app.go b/app/app.go index cb1bd6e..7b9b52c 100644 --- a/app/app.go +++ b/app/app.go @@ -250,7 +250,9 @@ func (a *App) HandleWork() { func (a *App) HandleTransaction(txn *blockchain.Transaction) { validTransaction := a.Pool.Set(txn, a.Chain) if !validTransaction { - log.Info("added transaction to pool from: " + txn.Sender) + log.Debug("added transaction to pool from address: " + txn.Sender.Repr()) + } else { + log.Debug("bad transaction rejected from sender: " + txn.Sender.Repr()) } } @@ -268,5 +270,6 @@ func (a *App) HandleBlock(blk *blockchain.Block) { if miner.IsMining() { miner.RestartMiner(a.Chain, blk) } + log.Debug("added blk number %d to chain with hash", blk.BlockNumber) } } diff --git a/app/app_test.go b/app/app_test.go index 27d9c94..0b0e04f 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -2,14 +2,11 @@ package app import ( "testing" - "time" log "github.com/Sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/msg" - "github.com/ubclaunchpad/cumulus/peer" - "github.com/ubclaunchpad/cumulus/pool" ) func init() { @@ -26,9 +23,9 @@ func TestPushHandlerNewBlock(t *testing.T) { } a.PushHandler(&push) select { - case work, ok := <-blockWorkQueue: + case blk, ok := <-blockQueue: assert.True(t, ok) - assert.Equal(t, work.Block, b) + assert.Equal(t, blk, b) } } @@ -42,9 +39,9 @@ func TestPushHandlerNewTestTransaction(t *testing.T) { } a.PushHandler(&push) select { - case work, ok := <-transactionWorkQueue: + case tr, ok := <-transactionQueue: assert.True(t, ok) - assert.Equal(t, work.Transaction, txn) + assert.Equal(t, tr, txn) } } @@ -69,7 +66,7 @@ func TestRequestHandlerNewBlockBadParams(t *testing.T) { req := createNewTestBlockRequest(hash) resp := a.RequestHandler(req) - block, ok := resp.Resource.(*blockchain.Block) + _, ok := resp.Resource.(*blockchain.Block) // Make sure request failed. assert.False(t, ok, "resource should not contain block") @@ -84,7 +81,7 @@ func TestRequestHandlerNewBlockBadType(t *testing.T) { req.ResourceType = 25 resp := a.RequestHandler(req) - block, ok := resp.Resource.(*blockchain.Block) + _, ok := resp.Resource.(*blockchain.Block) // Make sure request failed. assert.False(t, ok, "resource should not contain block") @@ -108,68 +105,50 @@ func TestRequestHandlerPeerInfo(t *testing.T) { func TestHandleTransactionOK(t *testing.T) { a := createNewTestApp() - HandleTransaction(goodTxnWork) - if mockResponder.Result != true { - t.FailNow() - } + bc, blk := blockchain.NewValidTestChainAndBlock() + a.Chain = bc + txn := blk.Transactions[0] + a.HandleTransaction(txn) + assert.Equal(t, a.Pool.Peek(), txn) } func TestHandleTransactionNotOK(t *testing.T) { - reset() - realWorker.HandleTransaction(badTxnWork) - if mockResponder.Result != false { - t.FailNow() - } + a := createNewTestApp() + a.HandleTransaction(blockchain.NewTestTransaction()) + assert.True(t, a.Pool.Empty()) } func TestHandleBlockOK(t *testing.T) { - reset() - realWorker.HandleBlock(goodBlkWork) - if mockResponder.Result != true { - t.FailNow() - } -} + a := createNewTestApp() + i := 0 -func TestHandleBlockNotOK(t *testing.T) { - reset() - realWorker.HandleBlock(badBlkWork) - if mockResponder.Result != false { - t.FailNow() - } -} + // TODO: Start miner. -func TestStartTxn(t *testing.T) { - reset() - realWorker.Start() - TransactionWorkQueue <- goodTxnWork - time.Sleep(50 * time.Millisecond) - mockResponder.Lock() - if !mockResponder.Result { - t.FailNow() + for i < 1000 { + a.Pool.SetUnsafe(blockchain.NewTestTransaction()) + i++ } - mockResponder.Unlock() -} -func TestStartBlk(t *testing.T) { - reset() - realWorker.Start() - BlockWorkQueue <- goodBlkWork - time.Sleep(50 * time.Millisecond) - mockResponder.Lock() - if !mockResponder.Result { - t.FailNow() - } - mockResponder.Unlock() + bc, blk := blockchain.NewValidTestChainAndBlock() + a.Chain = bc + a.HandleBlock(blk) + assert.Equal(t, blk, a.Chain.Blocks[2]) + + // TODO: Assert miner restarted. + // TODO: Assert pool appropriately emptied. } -func TestQuitWorker(t *testing.T) { - reset() - for i := 0; i < nWorkers; i++ { - NewWorker(i).Start() - } +func TestHandleBlockNotOK(t *testing.T) { + a := createNewTestApp() + i := 0 - // Would hang if quit call fails, and travis would fail. - for i := 0; i < nWorkers; i++ { - QuitChan <- i + // TODO: Start miner. + for i < 1000 { + a.Pool.SetUnsafe(blockchain.NewTestTransaction()) + i++ } + + a.HandleBlock(blockchain.NewTestBlock()) + // TODO: Assert miner not restarted. + // TODO: Assert pool untouched. } diff --git a/app/test_utils.go b/app/test_utils.go index 4879f7a..010a521 100644 --- a/app/test_utils.go +++ b/app/test_utils.go @@ -1 +1,27 @@ package app + +import ( + "github.com/ubclaunchpad/cumulus/blockchain" + "github.com/ubclaunchpad/cumulus/msg" + "github.com/ubclaunchpad/cumulus/peer" + "github.com/ubclaunchpad/cumulus/pool" +) + +func createNewTestBlockRequest(lastBlock interface{}) *msg.Request { + params := make(map[string]interface{}, 1) + params["lastBlock"] = lastBlock + return &msg.Request{ + ResourceType: msg.ResourceBlock, + Params: params, + } +} + +func createNewTestApp() *App { + chain, _ := blockchain.NewValidTestChainAndBlock() + return &App{ + PeerStore: peer.NewPeerStore("127.0.0.1:8000"), + CurrentUser: NewUser(), + Chain: chain, + Pool: pool.New(), + } +} diff --git a/pool/pool.go b/pool/pool.go index e90400a..e6c5b39 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -33,6 +33,11 @@ func (p *Pool) Len() int { return len(p.ValidTransactions) } +// Empty returns true if the pool has exactly zero transactions in it. +func (p *Pool) Empty() bool { + return p.Len() == 0 +} + // Get returns the tranasction with input transaction Hash h. func (p *Pool) Get(h blockchain.Hash) *blockchain.Transaction { return p.ValidTransactions[h].Transaction From 01f08ce7faf19047cd6af606c59d32ea4c946477 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Mon, 24 Jul 2017 23:27:28 -0700 Subject: [PATCH 05/11] add comment regarding TODO --- app/app.go | 1 + 1 file changed, 1 insertion(+) diff --git a/app/app.go b/app/app.go index 7b9b52c..04d5f8e 100644 --- a/app/app.go +++ b/app/app.go @@ -173,6 +173,7 @@ func (a *App) RequestHandler(req *msg.Request) msg.Response { blockNumber, ok := req.Params["blockNumber"].(uint32) if ok { // If its ok, we make try to a copy of it. + // TODO: Make this CopyBlockByHash. blk, err := a.Chain.CopyBlockByIndex(blockNumber) if err != nil { // Bad index parameter. From a15a6316bae28e26fdbd7d7a9eb86572285f1d83 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Tue, 25 Jul 2017 18:36:19 -0700 Subject: [PATCH 06/11] tests passing --- app/app.go | 2 +- app/app_test.go | 29 +++++++++++++++-------------- pool/pool.go | 8 ++++++-- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/app/app.go b/app/app.go index 04d5f8e..2c5d6ec 100644 --- a/app/app.go +++ b/app/app.go @@ -250,7 +250,7 @@ func (a *App) HandleWork() { // HandleTransaction handles new instance of TransactionWork. func (a *App) HandleTransaction(txn *blockchain.Transaction) { validTransaction := a.Pool.Set(txn, a.Chain) - if !validTransaction { + if validTransaction { log.Debug("added transaction to pool from address: " + txn.Sender.Repr()) } else { log.Debug("bad transaction rejected from sender: " + txn.Sender.Repr()) diff --git a/app/app_test.go b/app/app_test.go index 0b0e04f..2a0140a 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -10,7 +10,7 @@ import ( ) func init() { - log.SetLevel(log.InfoLevel) + log.SetLevel(log.DebugLevel) } func TestPushHandlerNewBlock(t *testing.T) { @@ -45,18 +45,18 @@ func TestPushHandlerNewTestTransaction(t *testing.T) { } } -func TestRequestHandlerNewBlockOK(t *testing.T) { - // Request a new block by hash and verify we get the right one. - a := createNewTestApp() +// TODO: Enable once block request by hash implemented. +// func TestRequestHandlerNewBlockOK(t *testing.T) { +// // Request a new block by hash and verify we get the right one. +// a := createNewTestApp() - req := createNewTestBlockRequest(a.Chain.Blocks[1].LastBlock) - resp := a.RequestHandler(req) - block, ok := resp.Resource.(*blockchain.Block) +// req := createNewTestBlockRequest(a.Chain.Blocks[1].LastBlock) +// resp := a.RequestHandler(req) +// block, ok := resp.Resource.(*blockchain.Block) - // Assertion time! - assert.True(t, ok, "resource should contain block") - assert.Equal(t, block, a.Chain.Blocks[1]) -} +// assert.True(t, ok, "resource should contain block") +// assert.Equal(t, block, a.Chain.Blocks[1]) +// } func TestRequestHandlerNewBlockBadParams(t *testing.T) { a := createNewTestApp() @@ -70,7 +70,7 @@ func TestRequestHandlerNewBlockBadParams(t *testing.T) { // Make sure request failed. assert.False(t, ok, "resource should not contain block") - assert.Equal(t, resp.Error.Code, msg.ResourceNotFound, resp.Error.Message) + assert.Equal(t, msg.ResourceNotFound, int(resp.Error.Code), resp.Error.Message) } func TestRequestHandlerNewBlockBadType(t *testing.T) { @@ -85,7 +85,7 @@ func TestRequestHandlerNewBlockBadType(t *testing.T) { // Make sure request failed. assert.False(t, ok, "resource should not contain block") - assert.Equal(t, resp.Error.Code, msg.InvalidResourceType, resp.Error.Message) + assert.Equal(t, msg.InvalidResourceType, int(resp.Error.Code), resp.Error.Message) } func TestRequestHandlerPeerInfo(t *testing.T) { @@ -107,8 +107,9 @@ func TestHandleTransactionOK(t *testing.T) { a := createNewTestApp() bc, blk := blockchain.NewValidTestChainAndBlock() a.Chain = bc - txn := blk.Transactions[0] + txn := blk.Transactions[1] a.HandleTransaction(txn) + assert.False(t, a.Pool.Empty()) assert.Equal(t, a.Pool.Peek(), txn) } diff --git a/pool/pool.go b/pool/pool.go index e6c5b39..66129b6 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -3,6 +3,8 @@ package pool import ( "time" + log "github.com/Sirupsen/logrus" + "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/common/util" "github.com/ubclaunchpad/cumulus/miner" @@ -69,11 +71,13 @@ func getIndex(a []*PooledTransaction, target time.Time, low, high int) int { // Set inserts a transaction into the pool, returning // true if the Transaction was inserted (was valid). func (p *Pool) Set(t *blockchain.Transaction, bc *blockchain.BlockChain) bool { - if ok, _ := bc.ValidTransaction(t); ok { + if ok, err := bc.ValidTransaction(t); ok { p.set(t) return true + } else { + log.Debug(err) + return false } - return false } // SetUnsafe adds a transaction to the pool without validation. From 99c22677e320d4667571c86d493fa20c4db34d97 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Tue, 25 Jul 2017 18:43:10 -0700 Subject: [PATCH 07/11] comments --- app/app.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/app/app.go b/app/app.go index 2c5d6ec..2649e6f 100644 --- a/app/app.go +++ b/app/app.go @@ -24,9 +24,7 @@ var ( ) const ( - // BlockQueueSize is the size of the BlockQueue channel. - blockQueueSize = 100 - // TransactionQueueSize is the size of the BlockQueue channel. + blockQueueSize = 100 transactionQueueSize = 100 ) @@ -271,6 +269,6 @@ func (a *App) HandleBlock(blk *blockchain.Block) { if miner.IsMining() { miner.RestartMiner(a.Chain, blk) } - log.Debug("added blk number %d to chain with hash", blk.BlockNumber) + log.Debug("added blk number %d to chain", blk.BlockNumber) } } From c5b2ab37e0bbeacefa8ccb7dbf091278492650b6 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Tue, 25 Jul 2017 18:44:04 -0700 Subject: [PATCH 08/11] remove responder --- app/app.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/app/app.go b/app/app.go index 2649e6f..ac47ea9 100644 --- a/app/app.go +++ b/app/app.go @@ -36,13 +36,6 @@ type App struct { Pool *pool.Pool } -// Responder is used to handle requests who require a response. -type Responder interface { - Send(ok bool) - Lock() - Unlock() -} - // BlockWorkQueue is a queue of blocks to process. var blockQueue = make(chan *blockchain.Block, blockQueueSize) From 815f38e1d71ab4b339b30b3b535b9d8bffd28998 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Tue, 25 Jul 2017 22:57:41 -0700 Subject: [PATCH 09/11] handle double spend within pool --- pool/pool.go | 5 +++++ pool/pool_test.go | 11 +++++++++++ 2 files changed, 16 insertions(+) diff --git a/pool/pool.go b/pool/pool.go index 66129b6..6ba7ac9 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -86,7 +86,12 @@ func (p *Pool) SetUnsafe(t *blockchain.Transaction) { } // Silently adds a transaction to the pool. +// Deletes a transaction if it exists from the same +// input hash. func (p *Pool) set(t *blockchain.Transaction) { + if txn, ok := p.ValidTransactions[t.Input.Hash]; ok { + p.Delete(txn.Transaction) + } vt := &PooledTransaction{ Transaction: t, Time: time.Now(), diff --git a/pool/pool_test.go b/pool/pool_test.go index 6710deb..5a9740f 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -121,3 +121,14 @@ func TestPop(t *testing.T) { p := New() assert.Nil(t, p.Pop()) } + +func TestSetDedupes(t *testing.T) { + p := New() + t1 := blockchain.NewTestTransaction() + t2 := blockchain.NewTestTransaction() + t1.Input.Hash = t2.Input.Hash + p.SetUnsafe(t1) + p.SetUnsafe(t2) + assert.Equal(t, p.Peek(), t2) + assert.Equal(t, p.Len(), 1) +} From 16dde33e0de3dc67a02e781cf790ad8baede88e2 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Tue, 25 Jul 2017 23:12:44 -0700 Subject: [PATCH 10/11] new tests on HandleWork --- app/app.go | 16 +++++----------- app/app_test.go | 22 ++++++++++++++++++++++ app/console.go | 26 ++++++++++++-------------- 3 files changed, 39 insertions(+), 25 deletions(-) diff --git a/app/app.go b/app/app.go index ac47ea9..5fd96b7 100644 --- a/app/app.go +++ b/app/app.go @@ -20,7 +20,10 @@ import ( ) var ( - logFile = os.Stdout + logFile = os.Stdout + blockQueue = make(chan *blockchain.Block, blockQueueSize) + transactionQueue = make(chan *blockchain.Transaction, transactionQueueSize) + quitChan = make(chan bool) ) const ( @@ -36,15 +39,6 @@ type App struct { Pool *pool.Pool } -// BlockWorkQueue is a queue of blocks to process. -var blockQueue = make(chan *blockchain.Block, blockQueueSize) - -// TransactionWorkQueue is a queue of transactions to process. -var transactionQueue = make(chan *blockchain.Transaction, transactionQueueSize) - -// QuitChan kills the app worker. -var quitChan = make(chan bool) - // Run sets up and starts a new Cumulus node with the // given configuration. This should only be called once (except in tests) func Run(cfg conf.Config) { @@ -121,7 +115,7 @@ func Run(cfg conf.Config) { } log.Info("Redirecting logs to logfile") log.SetOutput(logFile) - go RunConsole(a.PeerStore, &a) + go RunConsole(&a) } if len(config.Target) > 0 { diff --git a/app/app_test.go b/app/app_test.go index 2a0140a..534b406 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -153,3 +153,25 @@ func TestHandleBlockNotOK(t *testing.T) { // TODO: Assert miner not restarted. // TODO: Assert pool untouched. } + +func TestGetLocalPool(t *testing.T) { + assert.NotNil(t, getLocalPool()) +} + +func TestGetLocalChain(t *testing.T) { + assert.NotNil(t, getLocalChain()) +} + +func TestHandleBlock(t *testing.T) { + a := createNewTestApp() + go a.HandleWork() + blockQueue <- blockchain.NewTestBlock() + assert.Equal(t, len(blockQueue), 0) +} + +func TestHandleTransaction(t *testing.T) { + a := createNewTestApp() + go a.HandleWork() + transactionQueue <- blockchain.NewTestTransaction() + assert.Equal(t, len(transactionQueue), 0) +} diff --git a/app/console.go b/app/console.go index dcbef47..336519c 100644 --- a/app/console.go +++ b/app/console.go @@ -10,23 +10,21 @@ import ( ) var ( - shell *ishell.Shell - peerStore *peer.PeerStore + shell *ishell.Shell ) // RunConsole starts the Cumulus console. This should be run only once as a // goroutine, and logging should be redirected away from stdout before it is run. // It takes a pointer to a PeerStore so we can use the PeerStore to interact // with other peers and give the user info about the running instance. -func RunConsole(ps *peer.PeerStore, app *App) { - peerStore = ps +func RunConsole(a *App) { shell = ishell.New() shell.AddCmd(&ishell.Cmd{ Name: "create", Help: "create a new wallet hash or transaction", Func: func(ctx *ishell.Context) { - create(ctx, app) + create(ctx, a) }, }) shell.AddCmd(&ishell.Cmd{ @@ -38,21 +36,21 @@ func RunConsole(ps *peer.PeerStore, app *App) { Name: "address", Help: "show the address this host is listening on", Func: func(ctx *ishell.Context) { - listenAddr(ctx, peerStore) + listenAddr(ctx, a) }, }) shell.AddCmd(&ishell.Cmd{ Name: "peers", Help: "show the peers this host is connected to", Func: func(ctx *ishell.Context) { - peers(ctx, peerStore) + peers(ctx, a) }, }) shell.AddCmd(&ishell.Cmd{ Name: "connect", Help: "connect to another peer", Func: func(ctx *ishell.Context) { - connect(ctx, peerStore) + connect(ctx, a) }, }) @@ -97,22 +95,22 @@ func check(ctx *ishell.Context) { } } -func listenAddr(ctx *ishell.Context, peerStore *peer.PeerStore) { - shell.Println("Listening on", peerStore.ListenAddr) +func listenAddr(ctx *ishell.Context, a *App) { + shell.Println("Listening on", a.PeerStore.ListenAddr) } -func peers(tcx *ishell.Context, peerStore *peer.PeerStore) { - shell.Println("Connected to", peerStore.Addrs()) +func peers(tcx *ishell.Context, a *App) { + shell.Println("Connected to", a.PeerStore.Addrs()) } -func connect(ctx *ishell.Context, peerStore *peer.PeerStore) { +func connect(ctx *ishell.Context, a *App) { if len(ctx.Args) == 0 { shell.Println("Usage: connect [IP address]:[TCP port]") return } addr := ctx.Args[0] - _, err := peer.Connect(addr, peerStore) + _, err := peer.Connect(addr, a.PeerStore) if err != nil { shell.Println("Failed to extablish connection:", err) } else { From eefcbc303598b1871c7480776fbe22452fc49034 Mon Sep 17 00:00:00 2001 From: chadlagore Date: Tue, 25 Jul 2017 23:16:13 -0700 Subject: [PATCH 11/11] rename Set to Push --- app/app.go | 2 +- app/app_test.go | 4 ++-- pool/pool.go | 6 +++--- pool/pool_test.go | 16 ++++++++-------- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/app/app.go b/app/app.go index 5fd96b7..7a62299 100644 --- a/app/app.go +++ b/app/app.go @@ -234,7 +234,7 @@ func (a *App) HandleWork() { // HandleTransaction handles new instance of TransactionWork. func (a *App) HandleTransaction(txn *blockchain.Transaction) { - validTransaction := a.Pool.Set(txn, a.Chain) + validTransaction := a.Pool.Push(txn, a.Chain) if validTransaction { log.Debug("added transaction to pool from address: " + txn.Sender.Repr()) } else { diff --git a/app/app_test.go b/app/app_test.go index 534b406..5d1d355 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -126,7 +126,7 @@ func TestHandleBlockOK(t *testing.T) { // TODO: Start miner. for i < 1000 { - a.Pool.SetUnsafe(blockchain.NewTestTransaction()) + a.Pool.PushUnsafe(blockchain.NewTestTransaction()) i++ } @@ -145,7 +145,7 @@ func TestHandleBlockNotOK(t *testing.T) { // TODO: Start miner. for i < 1000 { - a.Pool.SetUnsafe(blockchain.NewTestTransaction()) + a.Pool.PushUnsafe(blockchain.NewTestTransaction()) i++ } diff --git a/pool/pool.go b/pool/pool.go index 6ba7ac9..f799f7f 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -70,7 +70,7 @@ func getIndex(a []*PooledTransaction, target time.Time, low, high int) int { // Set inserts a transaction into the pool, returning // true if the Transaction was inserted (was valid). -func (p *Pool) Set(t *blockchain.Transaction, bc *blockchain.BlockChain) bool { +func (p *Pool) Push(t *blockchain.Transaction, bc *blockchain.BlockChain) bool { if ok, err := bc.ValidTransaction(t); ok { p.set(t) return true @@ -80,8 +80,8 @@ func (p *Pool) Set(t *blockchain.Transaction, bc *blockchain.BlockChain) bool { } } -// SetUnsafe adds a transaction to the pool without validation. -func (p *Pool) SetUnsafe(t *blockchain.Transaction) { +// PushUnsafe adds a transaction to the pool without validation. +func (p *Pool) PushUnsafe(t *blockchain.Transaction) { p.set(t) } diff --git a/pool/pool_test.go b/pool/pool_test.go index 5a9740f..f0d8d6f 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -14,7 +14,7 @@ func TestGetAndSetTransaction(t *testing.T) { t.FailNow() } tr := b.Transactions[1] - if !p.Set(tr, bc) { + if !p.Push(tr, bc) { t.FailNow() } @@ -36,7 +36,7 @@ func TestGetAndSetTransaction(t *testing.T) { func TestSetBadTransaction(t *testing.T) { p := New() bc := blockchain.NewTestBlockChain() - if p.Set(blockchain.NewTestTransaction(), bc) { + if p.Push(blockchain.NewTestTransaction(), bc) { t.FailNow() } } @@ -50,7 +50,7 @@ func TestUpdatePool(t *testing.T) { } for _, tr := range legitBlk.Transactions[1:] { - p.Set(tr, bc) + p.Push(tr, bc) } if p.Len() == 0 { t.FailNow() @@ -77,9 +77,9 @@ func TestGetIndex(t *testing.T) { p := New() numTxns := 1000 tr := blockchain.NewTestTransaction() - p.SetUnsafe(tr) + p.PushUnsafe(tr) for i := 0; i < numTxns; i++ { - p.SetUnsafe(blockchain.NewTestTransaction()) + p.PushUnsafe(blockchain.NewTestTransaction()) } if p.GetIndex(tr) != 0 { t.FailNow() @@ -98,7 +98,7 @@ func TestNextBlock(t *testing.T) { lastBlk := chain.Blocks[nBlks-1] numTxns := 1000 for i := 0; i < numTxns; i++ { - p.SetUnsafe(blockchain.NewTestTransaction()) + p.PushUnsafe(blockchain.NewTestTransaction()) } b := p.NextBlock(chain, blockchain.NewWallet().Public(), 1<<18) assert.NotNil(t, b) @@ -127,8 +127,8 @@ func TestSetDedupes(t *testing.T) { t1 := blockchain.NewTestTransaction() t2 := blockchain.NewTestTransaction() t1.Input.Hash = t2.Input.Hash - p.SetUnsafe(t1) - p.SetUnsafe(t2) + p.PushUnsafe(t1) + p.PushUnsafe(t2) assert.Equal(t, p.Peek(), t2) assert.Equal(t, p.Len(), 1) }