From e7fb597c7e4d4c0dbfe491519ec9a719391c8c46 Mon Sep 17 00:00:00 2001 From: Mark Vayngrib Date: Thu, 28 Feb 2019 21:19:21 -0500 Subject: [PATCH] refactor(state): locks and concurrency --- package-lock.json | 45 +++++++++++--- package.json | 1 + src/state-manager/state.js | 119 ++++++++++++++----------------------- 3 files changed, 80 insertions(+), 85 deletions(-) diff --git a/package-lock.json b/package-lock.json index ee5c6f4..bcc1c29 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "plasma-chain", - "version": "0.0.15", + "version": "0.0.17", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -1047,6 +1047,11 @@ "resolved": "https://registry.npmjs.org/dom-walk/-/dom-walk-0.1.1.tgz", "integrity": "sha1-ZyIm3HTI95mtNTB9+TaroRrNYBg=" }, + "dotenv": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-6.2.0.tgz", + "integrity": "sha512-HygQCKUBSFl8wKQZBSemMywRWcEDNidvNbjGVyZu3nbZ8qq9ubiPoGLMdRDpfSrpkkm9BXYFkpKxxFX38o/76w==" + }, "duplexer3": { "version": "0.1.4", "resolved": "https://registry.npmjs.org/duplexer3/-/duplexer3-0.1.4.tgz", @@ -3248,6 +3253,11 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" }, + "multi-lock-queue": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/multi-lock-queue/-/multi-lock-queue-1.0.1.tgz", + "integrity": "sha512-Xi98NjqlklkrFveZ2nFNiP9avkvPmwkB/Wcw3bwJ0GvKkI3XeSNdvzYW9TIX98P+vyioMXlQqdHS7RtDEFlMvA==" + }, "mute-stream": { "version": "0.0.7", "resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-0.0.7.tgz", @@ -6757,6 +6767,7 @@ "from": "git+https://github.com/plasma-group/plasma-contracts.git", "requires": { "bn.js": "^4.11.8", + "dotenv": "^6.2.0", "ganache-cli": "^6.2.5", "plasma-utils": "^0.0.2", "web3": "1.0.0-beta.37" @@ -6786,8 +6797,8 @@ "i": "^0.3.6", "node-sass": "^4.11.0", "npm": "^6.5.0", - "plasma-js-lib": "github:plasma-group/plasma-js-lib", - "plasma-utils": "github:plasma-group/plasma-utils", + "plasma-js-lib": "^0.0.4-beta.3", + "plasma-utils": "^0.0.4-beta.0", "sass-loader": "^7.1.0", "style-loader": "^0.23.1", "vue": "^2.5.17", @@ -6796,26 +6807,41 @@ }, "dependencies": { "plasma-utils": { - "version": "github:plasma-group/plasma-utils#e910d7485f1cafe646981a03b86e5938154b7508", - "from": "github:plasma-group/plasma-utils", + "version": "0.0.4-beta.2", + "resolved": "github:plasma-group/plasma-utils#e910d7485f1cafe646981a03b86e5938154b7508", "requires": { "@babel/runtime": "^7.3.1", "debug": "^4.1.1", "lodash": "^4.17.11", + "typedarray-to-buffer": "^3.1.5", "web3-eth-accounts": "^1.0.0-beta.38" } } } }, "plasma-js-lib": { - "version": "github:plasma-group/plasma-js-lib#7659295e9f5d495c3564b40b1010022129828252", - "from": "github:plasma-group/plasma-js-lib", + "version": "0.0.4-beta.9", + "resolved": "github:plasma-group/plasma-js-lib#7659295e9f5d495c3564b40b1010022129828252", "requires": { "axios": "^0.18.0", "bn.js": "^4.11.8", "chai": "^4.2.0", "mocha": "^5.2.0", + "plasma-utils": "0.0.4-beta.0", "uuid": "^3.3.2" + }, + "dependencies": { + "plasma-utils": { + "version": "0.0.4-beta.0", + "resolved": "https://registry.npmjs.org/plasma-utils/-/plasma-utils-0.0.4-beta.0.tgz", + "integrity": "sha512-06PRXSFrttNIGvoFm85wXk3fb7spz+w2XkcxYjf5sgoRVkHPDR0bDUfthGASgUBl6FMWG6Rx2IGeH4z0VXbo/Q==", + "requires": { + "@babel/runtime": "^7.3.1", + "debug": "^4.1.1", + "lodash": "^4.17.11", + "web3-eth-accounts": "^1.0.0-beta.38" + } + } } }, "plasma-utils": { @@ -6825,6 +6851,7 @@ "@babel/runtime": "^7.3.1", "debug": "^4.1.1", "lodash": "^4.17.11", + "typedarray-to-buffer": "^3.1.5", "web3-eth-accounts": "^1.0.0-beta.38" } }, @@ -8983,7 +9010,7 @@ "lodash": "^4.17.11", "oboe": "2.1.4", "url-parse": "1.4.4", - "websocket": "git://github.com/frozeman/WebSocket-Node.git#browserifyCompatible", + "websocket": "git://github.com/frozeman/WebSocket-Node.git#6c72925e3f8aaaea8dc8450f97627e85263999f2", "xhr2-cookies": "1.1.0" } }, @@ -9129,7 +9156,7 @@ "requires": { "underscore": "1.8.3", "web3-core-helpers": "1.0.0-beta.37", - "websocket": "git://github.com/frozeman/WebSocket-Node.git#browserifyCompatible" + "websocket": "git://github.com/frozeman/WebSocket-Node.git#6c72925e3f8aaaea8dc8450f97627e85263999f2" }, "dependencies": { "bn.js": { diff --git a/package.json b/package.json index a10333b..afb9c76 100644 --- a/package.json +++ b/package.json @@ -53,6 +53,7 @@ "leveldown": "^4.0.1", "levelup": "^3.1.1", "lodash": "^4.17.11", + "multi-lock-queue": "~1.0.1", "plasma-contracts": "git+https://github.com/plasma-group/plasma-contracts.git", "plasma-explorer": "git+https://github.com/plasma-group/plasma-explorer.git", "plasma-utils": "git+https://github.com/plasma-group/plasma-utils.git", diff --git a/src/state-manager/state.js b/src/state-manager/state.js index 49950f9..c430805 100644 --- a/src/state-manager/state.js +++ b/src/state-manager/state.js @@ -5,6 +5,7 @@ const log = require('debug')('info:state') const models = require('plasma-utils').serialization.models const UnsignedTransaction = models.UnsignedTransaction const SignedTransaction = models.SignedTransaction +const { createLockingQueue } = require('multi-lock-queue') const itNext = require('../utils.js').itNext const itEnd = require('../utils.js').itEnd const getDepositTransaction = require('../utils.js').getDepositTransaction @@ -22,11 +23,6 @@ const BLOCKNUMBER_BYTE_SIZE = require('../constants.js').BLOCKNUMBER_BYTE_SIZE const DEPOSIT_TX_LENGTH = 73 const RECENT_TX_CACHE_SIZE = 30 -// ************* HELPER FUNCTIONS ************* // -const timeout = ms => new Promise(resolve => setTimeout(resolve, ms)) -const timeoutAmt = () => 0 -// const timeoutAmt = () => Math.floor(Math.random() * 2) - function decodeTransaction (encoding) { let tx if (encoding.length === DEPOSIT_TX_LENGTH) { @@ -65,6 +61,7 @@ class State { this.lock = {} this.recentTransactions = [] this.isCurrentBlockEmpty = true + this.queue = createLockingQueue() } async init () { @@ -99,17 +96,13 @@ class State { if (this.isCurrentBlockEmpty) { throw new Error('Block is empty! Cannot start new block.') } - if (this.lock.all === true) { + if (this.queue.isPaused()) { throw new Error('Attempting to start a new block when a global lock is already active') } - // Start a global lock as we increment the block number. Note that we will have to wait until all other locks are released - this.lock.all = true - // Wait until all other locks are released - while (Object.keys(this.lock).length !== 1) { - log('Waiting to acquire global lock') - await timeout(timeoutAmt()) - } - // Everything should be locked now that we have a `lock.all` activated. Time to increment the blockNumber + + log('Waiting to acquire global lock') + // Wait for all locks to be released and start a global lock as we increment the block number. + await this.queue.pause() this.blockNumber = this.blockNumber.add(new BN(1)) // Create a new block await this.db.put(Buffer.from('blockNumber'), this.blockNumber.toArrayLike(Buffer, 'big', BLOCKNUMBER_BYTE_SIZE)) @@ -120,35 +113,11 @@ class State { this.writeStream = fs.createWriteStream(this.tmpTxLogFile, { flags: 'a' }) // Set empty block flag this.isCurrentBlockEmpty = true - // Release our lock - delete this.lock.all + this.queue.resume() log('#### Started new Block #', this.blockNumber.toString()) return this.blockNumber } - attemptAcquireLocks (k) { - const keywords = k.slice() // Make a copy of the array to make sure we don't pollute anything when we add the `all` keyword - log('Attempting to acquire lock for:', keywords) - keywords.push('all') - if (keywords.some((val) => { return this.lock.hasOwnProperty(val) })) { - log('Failed') - // Failed to acquire locks - return false - } - // Acquire locks - for (let i = 0; i < keywords.length - 1; i++) { - this.lock[keywords[i]] = true - } - return true - } - - releaseLocks (keywords) { - // Pop off our lock queue - for (const keyword of keywords) { - delete this.lock[keyword] - } - } - async addDeposit (recipient, token, start, end) { // Check that we haven't already recorded this deposit try { @@ -157,10 +126,14 @@ class State { return } catch (err) { } - while (!this.attemptAcquireLocks([token.toString(16)])) { - // Wait before attempting again - await timeout(timeoutAmt()) - } + + return this.queue.enqueue({ + locks: [token.toString(16)], + fn: () => this._addDeposit(...arguments) + }) + } + + async _addDeposit (recipient, token, start, end) { const deposit = getDepositTransaction(Web3.utils.bytesToHex(recipient), token, start, end, this.blockNumber) const depositEncoded = deposit.encoded try { @@ -174,7 +147,7 @@ class State { } catch (err) { throw err } - this.releaseLocks([recipient, token.toString(16)]) + log('Added deposit with token type:', token.toString('hex'), ', start:', start.toString('hex'), 'and end:', end.toString('hex')) return depositEncoded } @@ -195,19 +168,6 @@ class State { } } - async getTransactionLock (tx) { - const senders = tx.transfers.map((transfer) => transfer.sender) - while (!this.attemptAcquireLocks(senders)) { - // Wait before attempting again - await timeout(timeoutAmt()) - } - } - - async releaseTransactionLock (tx) { - const senders = tx.transfers.map((transfer) => transfer.sender) - this.releaseLocks(senders) - } - validateAffectedRanges (tx) { // For all affected ranges, check all affected ranges are owned by the correct sender and blockNumber for (const tr of tx.transfers) { @@ -282,20 +242,23 @@ class State { // Check that the transaction is well formatted this.validateTransaction(tx) // Acquire lock on all of the transfer record senders - await this.getTransactionLock(tx) + return this.queue.enqueue({ + locks: getTxLocks(tx), + fn: () => this._addTransaction(tx) + }) + } + + async _addTransaction (tx) { + // Acquire lock on all of the transfer record senders log('Attempting to add transaction from:', tx.transfers[0].sender) - try { - // Get the ranges which the transaction affects and attach them to the transaction object - await this.addAffectedRangesToTx(tx) - // Check that all of the affected ranges are valid - await this.validateAffectedRanges(tx) - // All checks have passed, now write to the DB - await this.writeTransactionToDB(tx) - } catch (err) { - this.releaseTransactionLock(tx) - throw err - } - this.releaseTransactionLock(tx) + + // Get the ranges which the transaction affects and attach them to the transaction object + await this.addAffectedRangesToTx(tx) + // Check that all of the affected ranges are valid + await this.validateAffectedRanges(tx) + // All checks have passed, now write to the DB + await this.writeTransactionToDB(tx) + log('Added transaction from:', tx.transfers[0].recipient) this.addRecentTransaction(tx) // Record that this block is not empty @@ -342,11 +305,14 @@ class State { return affectedRanges } - async getOwnedRanges (address) { - while (!this.attemptAcquireLocks([address])) { - // Wait before attempting again - await timeout(timeoutAmt()) - } + getOwnedRanges (address) { + return this.queue.enqueue({ + locks: [address], + fn: () => this._getOwnedRanges(address) + }) + } + + async _getOwnedRanges (address) { // Get the ranges const addressBuffer = Buffer.from(Web3.utils.hexToBytes(address)) const it = this.db.iterator({ @@ -359,7 +325,6 @@ class State { result = await itNext(it) } await itEnd(it) - this.releaseLocks([address]) return ownedRanges } @@ -391,4 +356,6 @@ class State { } } +const getTxLocks = tx => tx.transfers.map((transfer) => transfer.sender) + module.exports = State