diff --git a/app/app.go b/app/app.go index 6c3d726..ac47ea9 100644 --- a/app/app.go +++ b/app/app.go @@ -13,23 +13,38 @@ import ( "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/conf" "github.com/ubclaunchpad/cumulus/conn" + "github.com/ubclaunchpad/cumulus/miner" "github.com/ubclaunchpad/cumulus/msg" "github.com/ubclaunchpad/cumulus/peer" "github.com/ubclaunchpad/cumulus/pool" ) var ( - chain *blockchain.BlockChain logFile = os.Stdout - // A reference to the transaction pool - tpool *pool.Pool +) + +const ( + blockQueueSize = 100 + transactionQueueSize = 100 ) // App contains information about a running instance of a Cumulus node type App struct { - PeerStore *peer.PeerStore + CurrentUser *User + PeerStore *peer.PeerStore + Chain *blockchain.BlockChain + Pool *pool.Pool } +// BlockWorkQueue is a queue of blocks to process. +var blockQueue = make(chan *blockchain.Block, blockQueueSize) + +// TransactionWorkQueue is a queue of transactions to process. +var transactionQueue = make(chan *blockchain.Transaction, transactionQueueSize) + +// QuitChan kills the app worker. +var quitChan = make(chan bool) + // Run sets up and starts a new Cumulus node with the // given configuration. This should only be called once (except in tests) func Run(cfg conf.Config) { @@ -38,7 +53,10 @@ func Run(cfg conf.Config) { addr := fmt.Sprintf("%s:%d", config.Interface, config.Port) a := App{ - PeerStore: peer.NewPeerStore(addr), + PeerStore: peer.NewPeerStore(addr), + CurrentUser: getCurrentUser(), + Chain: getLocalChain(), + Pool: getLocalPool(), } // Set logging level @@ -65,9 +83,8 @@ func Run(cfg conf.Config) { }() // Below we'll connect to peers. After which, requests could begin to - // stream in. We should first initalize our pool, workers to handle - // incoming messages. - initializeNode() + // stream in. Kick off a worker to handle requests and pushes. + go a.HandleWork() // Set Peer default Push and Request handlers. These functions will handle // request and push messages from all peers we connect to unless overridden @@ -90,12 +107,6 @@ func Run(cfg conf.Config) { // peer is running go a.PeerStore.MaintainConnections(wg) - // Request the blockchain. - if chain == nil { - log.Info("Request blockchain from peers not yet implemented.") - initializeChain() - } - // Wait for goroutines to start wg.Wait() @@ -110,7 +121,7 @@ func Run(cfg conf.Config) { } log.Info("Redirecting logs to logfile") log.SetOutput(logFile) - go RunConsole(a.PeerStore) + go RunConsole(a.PeerStore, &a) } if len(config.Target) > 0 { @@ -153,7 +164,8 @@ func (a *App) RequestHandler(req *msg.Request) msg.Response { blockNumber, ok := req.Params["blockNumber"].(uint32) if ok { // If its ok, we make try to a copy of it. - blk, err := chain.CopyBlockByIndex(blockNumber) + // TODO: Make this CopyBlockByHash. + blk, err := a.Chain.CopyBlockByIndex(blockNumber) if err != nil { // Bad index parameter. res.Error = notFoundErr @@ -180,7 +192,7 @@ func (a *App) PushHandler(push *msg.Push) { blk, ok := push.Resource.(*blockchain.Block) if ok { log.Info("Adding block to work queue.") - BlockWorkQueue <- BlockWork{blk, nil} + blockQueue <- blk } else { log.Error("Could not cast resource to block.") } @@ -188,7 +200,7 @@ func (a *App) PushHandler(push *msg.Push) { txn, ok := push.Resource.(*blockchain.Transaction) if ok { log.Info("Adding transaction to work queue.") - TransactionWorkQueue <- TransactionWork{txn, nil} + transactionQueue <- txn } else { log.Error("Could not cast resource to transaction.") } @@ -197,42 +209,59 @@ func (a *App) PushHandler(push *msg.Push) { } } -// initializeNode creates a transaction pool, workers and queues to handle -// incoming messages. -func initializeNode() { - tpool = pool.New() - intializeQueues() - initializeWorkers() +// getLocalChain returns an instance of the blockchain. +func getLocalChain() *blockchain.BlockChain { + // TODO: Look for local chain on disk. If doesn't exist, go rummaging + // around on the internets for one. + bc, _ := blockchain.NewValidTestChainAndBlock() + return bc } -// intializeQueues makes all necessary queues. -func intializeQueues() { - BlockWorkQueue = make(chan BlockWork, BlockQueueSize) - TransactionWorkQueue = make(chan TransactionWork, TransactionQueueSize) - QuitChan = make(chan int) +// getLocalPool returns an instance of the pool. +func getLocalPool() *pool.Pool { + // TODO: Look for local pool on disk. If doesn't exist, make a new one. + return pool.New() } -// initializeWorkers kicks off workers to handle incoming requests. -func initializeWorkers() { - for i := 0; i < nWorkers; i++ { - log.WithFields(log.Fields{"id": i}).Debug("Starting worker. ") - worker := NewWorker(i) - worker.Start() - workers[i] = &worker +// HandleWork continually collects new work from existing work channels. +func (a *App) HandleWork() { + log.Debug("Worker waiting for work.") + for { + select { + case work := <-transactionQueue: + a.HandleTransaction(work) + case work := <-blockQueue: + a.HandleBlock(work) + case <-quitChan: + return + } } } -// initializeChain creates the blockchain for the node. -func initializeChain() { - chain, _ = blockchain.NewValidTestChainAndBlock() - // TODO: Check if chain exists on disk. - // TODO: If not, request chain from peers. +// HandleTransaction handles new instance of TransactionWork. +func (a *App) HandleTransaction(txn *blockchain.Transaction) { + validTransaction := a.Pool.Set(txn, a.Chain) + if validTransaction { + log.Debug("added transaction to pool from address: " + txn.Sender.Repr()) + } else { + log.Debug("bad transaction rejected from sender: " + txn.Sender.Repr()) + } } -// killWorkers kills all workers. -func killWorkers() { - for i := 0; i < nWorkers; i++ { - QuitChan <- i - workers[i] = nil +// HandleBlock handles new instance of BlockWork. +func (a *App) HandleBlock(blk *blockchain.Block) { + validBlock := a.Pool.Update(blk, a.Chain) + + if validBlock { + // Append to the chain before requesting + // the next block so that the block + // numbers make sense. + a.Chain.AppendBlock(blk) + address := a.CurrentUser.Wallet.Public() + blk := a.Pool.NextBlock(a.Chain, address, a.CurrentUser.BlockSize) + if miner.IsMining() { + miner.RestartMiner(a.Chain, blk) + } + log.Debug("added blk number %d to chain", blk.BlockNumber) } } diff --git a/app/app_test.go b/app/app_test.go index 6e1083b..2a0140a 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -2,167 +2,97 @@ package app import ( "testing" - "time" log "github.com/Sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/msg" - "github.com/ubclaunchpad/cumulus/peer" ) func init() { - log.SetLevel(log.InfoLevel) -} - -func createNewBlockRequest(blockNumber interface{}) *msg.Request { - params := make(map[string]interface{}, 1) - params["blockNumber"] = blockNumber - return &msg.Request{ - ResourceType: msg.ResourceBlock, - Params: params, - } -} - -func TestKillWorkers(t *testing.T) { - intializeQueues() - time.Sleep(20 * time.Millisecond) - for i := 0; i < nWorkers; i++ { - if workers[i] != nil { - t.FailNow() - } - } - initializeWorkers() - time.Sleep(20 * time.Millisecond) - for i := 0; i < nWorkers; i++ { - if workers[i] == nil { - t.FailNow() - } - } - killWorkers() - time.Sleep(20 * time.Millisecond) - for i := 0; i < nWorkers; i++ { - if workers[i] != nil { - t.FailNow() - } - } -} - -func TestInitializeNode(t *testing.T) { - initializeNode() - if tpool == nil { - t.FailNow() - } - if BlockWorkQueue == nil { - t.FailNow() - } - if TransactionWorkQueue == nil { - t.FailNow() - } - killWorkers() + log.SetLevel(log.DebugLevel) } func TestPushHandlerNewBlock(t *testing.T) { - intializeQueues() + // Should put a block in the blockWorkQueue. + a := createNewTestApp() _, b := blockchain.NewValidTestChainAndBlock() push := msg.Push{ ResourceType: msg.ResourceBlock, Resource: b, } - a := App{ - PeerStore: peer.NewPeerStore("127.0.0.1:8000"), - } a.PushHandler(&push) select { - case work, ok := <-BlockWorkQueue: - if !ok { - t.FailNow() - } - if work.Block != b { - t.FailNow() - } + case blk, ok := <-blockQueue: + assert.True(t, ok) + assert.Equal(t, blk, b) } - // Add more here... } func TestPushHandlerNewTestTransaction(t *testing.T) { - intializeQueues() + // Should put a transaction in the transactionWorkQueue. + a := createNewTestApp() txn := blockchain.NewTestTransaction() push := msg.Push{ ResourceType: msg.ResourceTransaction, Resource: txn, } - a := App{ - PeerStore: peer.NewPeerStore("127.0.0.1:8000"), - } a.PushHandler(&push) select { - case work, ok := <-TransactionWorkQueue: - if !ok { - t.FailNow() - } - if work.Transaction != txn { - t.FailNow() - } + case tr, ok := <-transactionQueue: + assert.True(t, ok) + assert.Equal(t, tr, txn) } - // Add more here... } -func TestRequestHandlerNewBlockOK(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} +// TODO: Enable once block request by hash implemented. +// func TestRequestHandlerNewBlockOK(t *testing.T) { +// // Request a new block by hash and verify we get the right one. +// a := createNewTestApp() - // Set up a request (requesting block 0) - blockNumber := uint32(0) - req := createNewBlockRequest(blockNumber) +// req := createNewTestBlockRequest(a.Chain.Blocks[1].LastBlock) +// resp := a.RequestHandler(req) +// block, ok := resp.Resource.(*blockchain.Block) - resp := a.RequestHandler(req) - block, ok := resp.Resource.(*blockchain.Block) - - // Assertion time! - assert.True(t, ok, "resource should contain block") - assert.Equal(t, block.BlockNumber, blockNumber, - "block number should be "+string(blockNumber)) -} +// assert.True(t, ok, "resource should contain block") +// assert.Equal(t, block, a.Chain.Blocks[1]) +// } func TestRequestHandlerNewBlockBadParams(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} + a := createNewTestApp() // Set up a request. - blockNumber := "definitelynotanindex" - req := createNewBlockRequest(blockNumber) + hash := "definitelynotahash" + req := createNewTestBlockRequest(hash) resp := a.RequestHandler(req) - block, ok := resp.Resource.(*blockchain.Block) + _, ok := resp.Resource.(*blockchain.Block) // Make sure request failed. assert.False(t, ok, "resource should not contain block") - assert.Nil(t, block, "resource should not contain block") + assert.Equal(t, msg.ResourceNotFound, int(resp.Error.Code), resp.Error.Message) } func TestRequestHandlerNewBlockBadType(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} + a := createNewTestApp() // Set up a request. - req := createNewBlockRequest("doesntmatter") + req := createNewTestBlockRequest("doesntmatter") req.ResourceType = 25 resp := a.RequestHandler(req) - block, ok := resp.Resource.(*blockchain.Block) + _, ok := resp.Resource.(*blockchain.Block) // Make sure request failed. assert.False(t, ok, "resource should not contain block") - assert.Nil(t, block, "resource should not contain block") + assert.Equal(t, msg.InvalidResourceType, int(resp.Error.Code), resp.Error.Message) } func TestRequestHandlerPeerInfo(t *testing.T) { - initializeChain() - a := App{PeerStore: peer.NewPeerStore("127.0.0.1:8000")} + a := createNewTestApp() // Set up a request. - req := createNewBlockRequest("doesntmatter") + req := createNewTestBlockRequest("doesntmatter") req.ResourceType = msg.ResourcePeerInfo resp := a.RequestHandler(req) @@ -172,3 +102,54 @@ func TestRequestHandlerPeerInfo(t *testing.T) { assert.NotNil(t, res, "resource should contain peer info") // Assert peer address returned valid. } + +func TestHandleTransactionOK(t *testing.T) { + a := createNewTestApp() + bc, blk := blockchain.NewValidTestChainAndBlock() + a.Chain = bc + txn := blk.Transactions[1] + a.HandleTransaction(txn) + assert.False(t, a.Pool.Empty()) + assert.Equal(t, a.Pool.Peek(), txn) +} + +func TestHandleTransactionNotOK(t *testing.T) { + a := createNewTestApp() + a.HandleTransaction(blockchain.NewTestTransaction()) + assert.True(t, a.Pool.Empty()) +} + +func TestHandleBlockOK(t *testing.T) { + a := createNewTestApp() + i := 0 + + // TODO: Start miner. + + for i < 1000 { + a.Pool.SetUnsafe(blockchain.NewTestTransaction()) + i++ + } + + bc, blk := blockchain.NewValidTestChainAndBlock() + a.Chain = bc + a.HandleBlock(blk) + assert.Equal(t, blk, a.Chain.Blocks[2]) + + // TODO: Assert miner restarted. + // TODO: Assert pool appropriately emptied. +} + +func TestHandleBlockNotOK(t *testing.T) { + a := createNewTestApp() + i := 0 + + // TODO: Start miner. + for i < 1000 { + a.Pool.SetUnsafe(blockchain.NewTestTransaction()) + i++ + } + + a.HandleBlock(blockchain.NewTestBlock()) + // TODO: Assert miner not restarted. + // TODO: Assert pool untouched. +} diff --git a/app/console.go b/app/console.go index f92b669..dcbef47 100644 --- a/app/console.go +++ b/app/console.go @@ -18,14 +18,16 @@ var ( // goroutine, and logging should be redirected away from stdout before it is run. // It takes a pointer to a PeerStore so we can use the PeerStore to interact // with other peers and give the user info about the running instance. -func RunConsole(ps *peer.PeerStore) { +func RunConsole(ps *peer.PeerStore, app *App) { peerStore = ps shell = ishell.New() shell.AddCmd(&ishell.Cmd{ Name: "create", Help: "create a new wallet hash or transaction", - Func: create, + Func: func(ctx *ishell.Context) { + create(ctx, app) + }, }) shell.AddCmd(&ishell.Cmd{ Name: "check", @@ -35,30 +37,36 @@ func RunConsole(ps *peer.PeerStore) { shell.AddCmd(&ishell.Cmd{ Name: "address", Help: "show the address this host is listening on", - Func: listenAddr, + Func: func(ctx *ishell.Context) { + listenAddr(ctx, peerStore) + }, }) shell.AddCmd(&ishell.Cmd{ Name: "peers", Help: "show the peers this host is connected to", - Func: peers, + Func: func(ctx *ishell.Context) { + peers(ctx, peerStore) + }, }) shell.AddCmd(&ishell.Cmd{ Name: "connect", Help: "connect to another peer", - Func: connect, + Func: func(ctx *ishell.Context) { + connect(ctx, peerStore) + }, }) shell.Start() emoji.Println(":cloud: Welcome to the :sunny: Cumulus console :cloud:") } -func create(ctx *ishell.Context) { +func create(ctx *ishell.Context, app *App) { choice := ctx.MultiChoice([]string{ "Wallet", "Transaction", }, "What would you like to create?") if choice == 0 { - createHotWallet(ctx) + createHotWallet(ctx, app) } else { shell.Print("Sender wallet ID: ") senderID := shell.ReadLine() @@ -89,15 +97,15 @@ func check(ctx *ishell.Context) { } } -func listenAddr(ctx *ishell.Context) { +func listenAddr(ctx *ishell.Context, peerStore *peer.PeerStore) { shell.Println("Listening on", peerStore.ListenAddr) } -func peers(tcx *ishell.Context) { +func peers(tcx *ishell.Context, peerStore *peer.PeerStore) { shell.Println("Connected to", peerStore.Addrs()) } -func connect(ctx *ishell.Context) { +func connect(ctx *ishell.Context, peerStore *peer.PeerStore) { if len(ctx.Args) == 0 { shell.Println("Usage: connect [IP address]:[TCP port]") return @@ -112,11 +120,11 @@ func connect(ctx *ishell.Context) { } } -func createHotWallet(ctx *ishell.Context) { +func createHotWallet(ctx *ishell.Context, app *App) { shell.Print("Enter wallet name: ") walletName := shell.ReadLine() wallet := HotWallet{walletName, blockchain.NewWallet()} - currentUser.HotWallet = wallet + app.CurrentUser.HotWallet = wallet emoji.Println(":credit_card: New hot wallet created!") emoji.Println(":raising_hand: Name: " + wallet.Name) emoji.Println(":mailbox: Address: " + wallet.Wallet.Public().Repr()) diff --git a/app/test_utils.go b/app/test_utils.go new file mode 100644 index 0000000..010a521 --- /dev/null +++ b/app/test_utils.go @@ -0,0 +1,27 @@ +package app + +import ( + "github.com/ubclaunchpad/cumulus/blockchain" + "github.com/ubclaunchpad/cumulus/msg" + "github.com/ubclaunchpad/cumulus/peer" + "github.com/ubclaunchpad/cumulus/pool" +) + +func createNewTestBlockRequest(lastBlock interface{}) *msg.Request { + params := make(map[string]interface{}, 1) + params["lastBlock"] = lastBlock + return &msg.Request{ + ResourceType: msg.ResourceBlock, + Params: params, + } +} + +func createNewTestApp() *App { + chain, _ := blockchain.NewValidTestChainAndBlock() + return &App{ + PeerStore: peer.NewPeerStore("127.0.0.1:8000"), + CurrentUser: NewUser(), + Chain: chain, + Pool: pool.New(), + } +} diff --git a/app/user.go b/app/user.go index 8d00f7f..8f9a490 100644 --- a/app/user.go +++ b/app/user.go @@ -14,15 +14,6 @@ type HotWallet struct { blockchain.Wallet } -var currentUser *User - -const defaultBlockSize = 1 << 18 - -func init() { - // Temporary to create a new user for testing. - currentUser = NewUser() -} - // NewUser creates a new user func NewUser() *User { return &User{ @@ -30,11 +21,13 @@ func NewUser() *User { Wallet: blockchain.NewWallet(), Name: "default", }, - BlockSize: defaultBlockSize, + BlockSize: blockchain.DefaultBlockSize, } } -// GetCurrentUser returns the current user. -func GetCurrentUser() *User { - return currentUser +// getCurrentUser gets the current user. +func getCurrentUser() *User { + // TODO: Check for local user information on disk, + // If doesnt exist, create new user. + return NewUser() } diff --git a/app/work.go b/app/work.go deleted file mode 100644 index fd8731a..0000000 --- a/app/work.go +++ /dev/null @@ -1,37 +0,0 @@ -package app - -import "github.com/ubclaunchpad/cumulus/blockchain" - -const ( - // BlockQueueSize is the size of the BlockQueue channel. - BlockQueueSize = 100 - // TransactionQueueSize is the size of the BlockQueue channel. - TransactionQueueSize = 100 -) - -// Responder is used to handle requests who require a response. -type Responder interface { - Send(ok bool) - Lock() - Unlock() -} - -// BlockWorkQueue is a queue of blocks to process. -var BlockWorkQueue = make(chan BlockWork, BlockQueueSize) - -// TransactionWorkQueue is a queue of transactions to process. -var TransactionWorkQueue = make(chan TransactionWork, TransactionQueueSize) - -// TransactionWork holds a new transaction job, and a Responder for -// sending results. -type TransactionWork struct { - *blockchain.Transaction - Responder -} - -// BlockWork holds a new block job, and a Responder for -// sending results. -type BlockWork struct { - *blockchain.Block - Responder -} diff --git a/app/worker.go b/app/worker.go deleted file mode 100644 index 2e4bbcf..0000000 --- a/app/worker.go +++ /dev/null @@ -1,98 +0,0 @@ -package app - -import ( - log "github.com/Sirupsen/logrus" - "github.com/ubclaunchpad/cumulus/miner" -) - -// nWorkers is how many workers this node has. -const nWorkers = 10 - -// Workers is a list of workers. -var workers [nWorkers]*AppWorker - -// WorkerQueue is the channel of workers handling work. -var WorkerQueue chan AppWorker - -// QuitChan is the channel we use to kill the workers. -var QuitChan chan int - -// Worker is an interface for the basic app worker tasks. -type Worker interface { - HandleTransaction(work TransactionWork) - HandleBlock(work BlockWork) -} - -// AppWorker implements the basic worker. -type AppWorker struct { - ID int - log.FieldLogger -} - -// NewWorker returns a new AppWorker object. -func NewWorker(id int) AppWorker { - return AppWorker{ - ID: id, - FieldLogger: log.WithField("id", id), - } -} - -// Start continually collects new work from existing work channels. -func (w AppWorker) Start() { - go func() { - for { - // Wait for work. - log.WithFields(log.Fields{ - "id": w.ID, - }).Debug("Worker waiting for work") - select { - case work := <-TransactionWorkQueue: - w.FieldLogger.Debug("Worker handling new transaction work") - w.HandleTransaction(work) - case work := <-BlockWorkQueue: - w.FieldLogger.Debug("Worker handling new block work") - w.HandleBlock(work) - case <-QuitChan: - w.FieldLogger.Debug("Worker quitting") - return - } - } - }() -} - -// HandleTransaction handles new instance of TransactionWork. -func (w *AppWorker) HandleTransaction(work TransactionWork) { - validTransaction := tpool.Set(work.Transaction, chain) - - // Respond to the request if a response method was provided. - if work.Responder != nil { - work.Responder.Lock() - defer work.Responder.Unlock() - work.Responder.Send(validTransaction) - } -} - -// HandleBlock handles new instance of BlockWork. -func (w *AppWorker) HandleBlock(work BlockWork) { - validBlock := tpool.Update(work.Block, chain) - - if validBlock { - user := GetCurrentUser() - // Append to the chain before requesting - // the next block so that the block - // numbers make sense. - chain.AppendBlock(work.Block) - address := user.Wallet.Public() - blk := tpool.NextBlock(chain, address, user.BlockSize) - if miner.IsMining() { - miner.RestartMiner(chain, blk) - } - } - - // Respond to the request if a response method was provided. - if work.Responder != nil { - work.Responder.Lock() - defer work.Responder.Unlock() - work.Responder.Send(validBlock) - } -} diff --git a/app/worker_test.go b/app/worker_test.go deleted file mode 100644 index 7d03747..0000000 --- a/app/worker_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package app - -import ( - "sync" - "testing" - "time" - - bc "github.com/ubclaunchpad/cumulus/blockchain" - "github.com/ubclaunchpad/cumulus/pool" -) - -var ( - legitBlock *bc.Block - legitTransaction *bc.Transaction - realWorker AppWorker - txnWork TransactionWork - mockResponder MockResponder - badTxnWork TransactionWork - goodTxnWork TransactionWork - badBlkWork BlockWork - goodBlkWork BlockWork - badTxn *bc.Transaction - badBlk *bc.Block -) - -type MockResponder struct { - Result bool - NumCalls int - *sync.Mutex -} - -func (r *MockResponder) Send(ok bool) { - r.Result = ok - r.NumCalls++ -} - -func (r *MockResponder) Lock() { - r.Mutex.Lock() -} - -func (r *MockResponder) Unlock() { - r.Mutex.Unlock() -} - -func init() { - badTxn = bc.NewTestTransaction() - badBlk = bc.NewTestBlock() -} - -func reset() { - tpool = pool.New() - chain, legitBlock = bc.NewValidTestChainAndBlock() - legitTransaction = legitBlock.Transactions[1] - realWorker = NewWorker(7) - mockResponder = MockResponder{ - Mutex: &sync.Mutex{}, - Result: false, - } - goodTxnWork = TransactionWork{ - Transaction: legitTransaction, - Responder: &mockResponder, - } - badTxnWork = TransactionWork{ - Transaction: badTxn, - Responder: &mockResponder, - } - goodBlkWork = BlockWork{ - Block: legitBlock, - Responder: &mockResponder, - } - badBlkWork = BlockWork{ - Block: badBlk, - Responder: &mockResponder, - } - QuitChan = make(chan int) -} - -func TestNewWorker(t *testing.T) { - reset() - if realWorker.ID != 7 { - t.FailNow() - } -} - -func TestHandleTransactionOK(t *testing.T) { - reset() - realWorker.HandleTransaction(goodTxnWork) - if mockResponder.Result != true { - t.FailNow() - } -} - -func TestHandleTransactionNotOK(t *testing.T) { - reset() - realWorker.HandleTransaction(badTxnWork) - if mockResponder.Result != false { - t.FailNow() - } -} - -func TestHandleBlockOK(t *testing.T) { - reset() - realWorker.HandleBlock(goodBlkWork) - if mockResponder.Result != true { - t.FailNow() - } -} - -func TestHandleBlockNotOK(t *testing.T) { - reset() - realWorker.HandleBlock(badBlkWork) - if mockResponder.Result != false { - t.FailNow() - } -} - -func TestStartTxn(t *testing.T) { - reset() - realWorker.Start() - TransactionWorkQueue <- goodTxnWork - time.Sleep(50 * time.Millisecond) - mockResponder.Lock() - if !mockResponder.Result { - t.FailNow() - } - mockResponder.Unlock() -} - -func TestStartBlk(t *testing.T) { - reset() - realWorker.Start() - BlockWorkQueue <- goodBlkWork - time.Sleep(50 * time.Millisecond) - mockResponder.Lock() - if !mockResponder.Result { - t.FailNow() - } - mockResponder.Unlock() -} - -func TestQuitWorker(t *testing.T) { - reset() - for i := 0; i < nWorkers; i++ { - NewWorker(i).Start() - } - - // Would hang if quit call fails, and travis would fail. - for i := 0; i < nWorkers; i++ { - QuitChan <- i - } -} diff --git a/blockchain/block.go b/blockchain/block.go index 4d7a9d8..3d8bfd4 100644 --- a/blockchain/block.go +++ b/blockchain/block.go @@ -9,6 +9,9 @@ import ( "github.com/ubclaunchpad/cumulus/common/util" ) +// DefaultBlockSize is the default block size, can be augmented by the user. +const DefaultBlockSize = 1 << 18 + // BlockHeader contains metadata about a block type BlockHeader struct { // BlockNumber is the position of the block within the blockchain diff --git a/pool/pool.go b/pool/pool.go index e90400a..6ba7ac9 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -3,6 +3,8 @@ package pool import ( "time" + log "github.com/Sirupsen/logrus" + "github.com/ubclaunchpad/cumulus/blockchain" "github.com/ubclaunchpad/cumulus/common/util" "github.com/ubclaunchpad/cumulus/miner" @@ -33,6 +35,11 @@ func (p *Pool) Len() int { return len(p.ValidTransactions) } +// Empty returns true if the pool has exactly zero transactions in it. +func (p *Pool) Empty() bool { + return p.Len() == 0 +} + // Get returns the tranasction with input transaction Hash h. func (p *Pool) Get(h blockchain.Hash) *blockchain.Transaction { return p.ValidTransactions[h].Transaction @@ -64,11 +71,13 @@ func getIndex(a []*PooledTransaction, target time.Time, low, high int) int { // Set inserts a transaction into the pool, returning // true if the Transaction was inserted (was valid). func (p *Pool) Set(t *blockchain.Transaction, bc *blockchain.BlockChain) bool { - if ok, _ := bc.ValidTransaction(t); ok { + if ok, err := bc.ValidTransaction(t); ok { p.set(t) return true + } else { + log.Debug(err) + return false } - return false } // SetUnsafe adds a transaction to the pool without validation. @@ -77,7 +86,12 @@ func (p *Pool) SetUnsafe(t *blockchain.Transaction) { } // Silently adds a transaction to the pool. +// Deletes a transaction if it exists from the same +// input hash. func (p *Pool) set(t *blockchain.Transaction) { + if txn, ok := p.ValidTransactions[t.Input.Hash]; ok { + p.Delete(txn.Transaction) + } vt := &PooledTransaction{ Transaction: t, Time: time.Now(), diff --git a/pool/pool_test.go b/pool/pool_test.go index 6710deb..5a9740f 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -121,3 +121,14 @@ func TestPop(t *testing.T) { p := New() assert.Nil(t, p.Pop()) } + +func TestSetDedupes(t *testing.T) { + p := New() + t1 := blockchain.NewTestTransaction() + t2 := blockchain.NewTestTransaction() + t1.Input.Hash = t2.Input.Hash + p.SetUnsafe(t1) + p.SetUnsafe(t2) + assert.Equal(t, p.Peek(), t2) + assert.Equal(t, p.Len(), 1) +}