Skip to content
This repository has been archived by the owner on Jun 10, 2019. It is now read-only.

Commit

Permalink
refactor(state): locks and concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
mvayngrib committed Mar 1, 2019
1 parent ca7a5c7 commit e7fb597
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 85 deletions.
45 changes: 36 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"leveldown": "^4.0.1",
"levelup": "^3.1.1",
"lodash": "^4.17.11",
"multi-lock-queue": "~1.0.1",
"plasma-contracts": "git+https://github.com/plasma-group/plasma-contracts.git",
"plasma-explorer": "git+https://github.com/plasma-group/plasma-explorer.git",
"plasma-utils": "git+https://github.com/plasma-group/plasma-utils.git",
Expand Down
119 changes: 43 additions & 76 deletions src/state-manager/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const log = require('debug')('info:state')
const models = require('plasma-utils').serialization.models
const UnsignedTransaction = models.UnsignedTransaction
const SignedTransaction = models.SignedTransaction
const { createLockingQueue } = require('multi-lock-queue')
const itNext = require('../utils.js').itNext
const itEnd = require('../utils.js').itEnd
const getDepositTransaction = require('../utils.js').getDepositTransaction
Expand All @@ -22,11 +23,6 @@ const BLOCKNUMBER_BYTE_SIZE = require('../constants.js').BLOCKNUMBER_BYTE_SIZE
const DEPOSIT_TX_LENGTH = 73
const RECENT_TX_CACHE_SIZE = 30

// ************* HELPER FUNCTIONS ************* //
const timeout = ms => new Promise(resolve => setTimeout(resolve, ms))
const timeoutAmt = () => 0
// const timeoutAmt = () => Math.floor(Math.random() * 2)

function decodeTransaction (encoding) {
let tx
if (encoding.length === DEPOSIT_TX_LENGTH) {
Expand Down Expand Up @@ -65,6 +61,7 @@ class State {
this.lock = {}
this.recentTransactions = []
this.isCurrentBlockEmpty = true
this.queue = createLockingQueue()
}

async init () {
Expand Down Expand Up @@ -99,17 +96,13 @@ class State {
if (this.isCurrentBlockEmpty) {
throw new Error('Block is empty! Cannot start new block.')
}
if (this.lock.all === true) {
if (this.queue.isPaused()) {
throw new Error('Attempting to start a new block when a global lock is already active')
}
// Start a global lock as we increment the block number. Note that we will have to wait until all other locks are released
this.lock.all = true
// Wait until all other locks are released
while (Object.keys(this.lock).length !== 1) {
log('Waiting to acquire global lock')
await timeout(timeoutAmt())
}
// Everything should be locked now that we have a `lock.all` activated. Time to increment the blockNumber

log('Waiting to acquire global lock')
// Wait for all locks to be released and start a global lock as we increment the block number.
await this.queue.pause()
this.blockNumber = this.blockNumber.add(new BN(1))
// Create a new block
await this.db.put(Buffer.from('blockNumber'), this.blockNumber.toArrayLike(Buffer, 'big', BLOCKNUMBER_BYTE_SIZE))
Expand All @@ -120,35 +113,11 @@ class State {
this.writeStream = fs.createWriteStream(this.tmpTxLogFile, { flags: 'a' })
// Set empty block flag
this.isCurrentBlockEmpty = true
// Release our lock
delete this.lock.all
this.queue.resume()
log('#### Started new Block #', this.blockNumber.toString())
return this.blockNumber
}

attemptAcquireLocks (k) {
const keywords = k.slice() // Make a copy of the array to make sure we don't pollute anything when we add the `all` keyword
log('Attempting to acquire lock for:', keywords)
keywords.push('all')
if (keywords.some((val) => { return this.lock.hasOwnProperty(val) })) {
log('Failed')
// Failed to acquire locks
return false
}
// Acquire locks
for (let i = 0; i < keywords.length - 1; i++) {
this.lock[keywords[i]] = true
}
return true
}

releaseLocks (keywords) {
// Pop off our lock queue
for (const keyword of keywords) {
delete this.lock[keyword]
}
}

async addDeposit (recipient, token, start, end) {
// Check that we haven't already recorded this deposit
try {
Expand All @@ -157,10 +126,14 @@ class State {
return
} catch (err) {
}
while (!this.attemptAcquireLocks([token.toString(16)])) {
// Wait before attempting again
await timeout(timeoutAmt())
}

return this.queue.enqueue({
locks: [token.toString(16)],
fn: () => this._addDeposit(...arguments)
})
}

async _addDeposit (recipient, token, start, end) {
const deposit = getDepositTransaction(Web3.utils.bytesToHex(recipient), token, start, end, this.blockNumber)
const depositEncoded = deposit.encoded
try {
Expand All @@ -174,7 +147,7 @@ class State {
} catch (err) {
throw err
}
this.releaseLocks([recipient, token.toString(16)])

log('Added deposit with token type:', token.toString('hex'), ', start:', start.toString('hex'), 'and end:', end.toString('hex'))
return depositEncoded
}
Expand All @@ -195,19 +168,6 @@ class State {
}
}

async getTransactionLock (tx) {
const senders = tx.transfers.map((transfer) => transfer.sender)
while (!this.attemptAcquireLocks(senders)) {
// Wait before attempting again
await timeout(timeoutAmt())
}
}

async releaseTransactionLock (tx) {
const senders = tx.transfers.map((transfer) => transfer.sender)
this.releaseLocks(senders)
}

validateAffectedRanges (tx) {
// For all affected ranges, check all affected ranges are owned by the correct sender and blockNumber
for (const tr of tx.transfers) {
Expand Down Expand Up @@ -282,20 +242,23 @@ class State {
// Check that the transaction is well formatted
this.validateTransaction(tx)
// Acquire lock on all of the transfer record senders
await this.getTransactionLock(tx)
return this.queue.enqueue({
locks: getTxLocks(tx),
fn: () => this._addTransaction(tx)
})
}

async _addTransaction (tx) {
// Acquire lock on all of the transfer record senders
log('Attempting to add transaction from:', tx.transfers[0].sender)
try {
// Get the ranges which the transaction affects and attach them to the transaction object
await this.addAffectedRangesToTx(tx)
// Check that all of the affected ranges are valid
await this.validateAffectedRanges(tx)
// All checks have passed, now write to the DB
await this.writeTransactionToDB(tx)
} catch (err) {
this.releaseTransactionLock(tx)
throw err
}
this.releaseTransactionLock(tx)

// Get the ranges which the transaction affects and attach them to the transaction object
await this.addAffectedRangesToTx(tx)
// Check that all of the affected ranges are valid
await this.validateAffectedRanges(tx)
// All checks have passed, now write to the DB
await this.writeTransactionToDB(tx)

log('Added transaction from:', tx.transfers[0].recipient)
this.addRecentTransaction(tx)
// Record that this block is not empty
Expand Down Expand Up @@ -342,11 +305,14 @@ class State {
return affectedRanges
}

async getOwnedRanges (address) {
while (!this.attemptAcquireLocks([address])) {
// Wait before attempting again
await timeout(timeoutAmt())
}
getOwnedRanges (address) {
return this.queue.enqueue({
locks: [address],
fn: () => this._getOwnedRanges(address)
})
}

async _getOwnedRanges (address) {
// Get the ranges
const addressBuffer = Buffer.from(Web3.utils.hexToBytes(address))
const it = this.db.iterator({
Expand All @@ -359,7 +325,6 @@ class State {
result = await itNext(it)
}
await itEnd(it)
this.releaseLocks([address])
return ownedRanges
}

Expand Down Expand Up @@ -391,4 +356,6 @@ class State {
}
}

const getTxLocks = tx => tx.transfers.map((transfer) => transfer.sender)

module.exports = State

0 comments on commit e7fb597

Please sign in to comment.