Skip to content

Commit

Permalink
using executing set to avoid re execute.
Browse files Browse the repository at this point in the history
  • Loading branch information
calvogenerico committed Jun 21, 2022
1 parent 32e919a commit cc3cf66
Show file tree
Hide file tree
Showing 15 changed files with 272 additions and 210 deletions.
1 change: 0 additions & 1 deletion db-migrations/20220310174927_initial_migration.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ exports.up = async (knex) => {

await knex.schema.createTable('berry', t => {
t.text('location').notNullable().primary()
t.jsonb('state').notNullable()
t.text('class')
t.text('scripthash')
t.text('lock')
Expand Down
4 changes: 3 additions & 1 deletion src/bin/start-execution-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
RabbitQueue,
ExecutionWorker
} = require('../index')
const { ExecutingSet } = require('../executing-set')

const logger = console
const network = NETWORK
Expand Down Expand Up @@ -60,7 +61,8 @@ const executor = new Executor(network, WORKERS, blobs, ds, logger, {
BLOB_DB_CONNECTION_URI: BLOB_DB_CONNECTION_URI
}
})
const indexer = new Indexer(null, ds, blobs, trustList, executor, network, logger)
const execSet = new ExecutingSet(ds)
const indexer = new Indexer(ds, blobs, trustList, executor, network, execSet, logger)
let execQueue = null
let trustQueue = null
let rabbitConnection = null
Expand Down
1 change: 1 addition & 0 deletions src/data-sources/columns.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const JIG = {
const BERRY = {
NAME: 'berry',
location: 'location',
klass: 'class',
state: 'state'
}

Expand Down
27 changes: 11 additions & 16 deletions src/data-sources/knex-datasource.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,18 @@ class KnexDatasource {
}

async removeTxFromExecuting (txid) {
await this.knex(EXECUTING.NAME).where(EXECUTING.txid, txid).del()
return await this.knex(EXECUTING.NAME).where(EXECUTING.txid, txid).del()
}

async findAllExecutingTxids () {
return this.knex(EXECUTING.NAME).pluck(EXECUTING.txid)
}

async checkExecuting (txid) {
const result = await this.knex(EXECUTING.NAME).where({ txid }).pluck(EXECUTING.txid)
return result.length > 0
}

async txidTrustedAndReadyToExecute (txid) {
const mainTx = 'mainTx'
const knex = this.knex
Expand Down Expand Up @@ -435,22 +440,12 @@ class KnexDatasource {
}
}

async setJigState (location, stateObject) {
await this.knex(JIG.NAME)
.insert({ [JIG.location]: location, [JIG.state]: JSON.stringify(stateObject) })
.onConflict().ignore()
}

async setBerryState (location, stateObject) {
async setBerryMetadata (location, klass) {
await this.knex(BERRY.NAME)
.insert({ [BERRY.location]: location, [BERRY.state]: JSON.stringify(stateObject) })
.onConflict().ignore()
}

async setBerryMetadata (location) {
await this.knex(BERRY.NAME)
.insert({ [BERRY.location]: location })
.onConflict().ignore()
.insert({
[BERRY.location]: location,
[BERRY.klass]: klass
})
}

async getBerryState (location) {
Expand Down
23 changes: 23 additions & 0 deletions src/executing-set.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
class ExecutingSet {
constructor (ds) {
this.ds = ds
}

check (txid) {
return this.ds.checkExecuting(txid)
}

all () {
return this.ds.findAllExecutingTxids()
}

add (txid) {
return this.ds.markTxAsExecuting(txid)
}

remove (txid) {
return this.ds.removeTxFromExecuting(txid)
}
}

module.exports = { ExecutingSet }
10 changes: 8 additions & 2 deletions src/execution-manager.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
class ExecutionManager {
constructor (blobs, execQueue, trustQueue) {
constructor (blobs, execQueue, trustQueue, executingSet) {
this.blobs = blobs
this.execQueue = execQueue
this.trustQueue = trustQueue
this.executingSet = executingSet
this.execRQueue = null
this.trustRQueue = null
this.subscription = null
Expand All @@ -17,7 +18,10 @@ class ExecutionManager {
async indexTxNow (txBuff, blockHeight = null) {
const txid = await this.blobs.pushTx(null, txBuff)
const rQueue = await this._execReplyQueue()
return rQueue.publishAndAwaitResponse({ txid })
this.executingSet.add(txid)
const result = await rQueue.publishAndAwaitResponse({ txid })
this.executingSet.remove(txid)
return result
}

// async indexTxid (txBuff, blockHeight = null) {
Expand All @@ -27,6 +31,8 @@ class ExecutionManager {
// }

async trustTxLater (txid, trust) {
if (this.executingSet.check(txid)) { return }
this.executingSet.add(txid)
await this.trustQueue.publish({ txid, trust })
}

Expand Down
5 changes: 4 additions & 1 deletion src/execution-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ class ExecutionWorker {
const unknownDepsProms = result.unknownDeps.map(async txid => {
return this.execQueue.publish({ txid })
})
await Promise.all([...enableProms, ...unknownDepsProms])
const missingDeps = result.missingDeps.map(async txid => {
return this.execQueue.publish({ txid })
})
await Promise.all([...enableProms, ...unknownDepsProms, ...missingDeps])
}
}

Expand Down
17 changes: 13 additions & 4 deletions src/indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ const { IndexerResult } = require('./model/indexer-result')
// ------------------------------------------------------------------------------------------------

class Indexer {
constructor (_database, ds, blobs, trustList, executor, network, logger) {
constructor (ds, blobs, trustList, executor, network, execSet, logger) {
this.onFailToIndex = null
this.pendingRetries = new Map()
this.execSet = execSet

this.logger = logger
this.ds = ds
Expand Down Expand Up @@ -47,14 +48,15 @@ class Indexer {
if (txBuff === null) {
await this.ds.addNewTx(txid, new Date(), null)
await this.ds.setTransactionExecutionFailed(txid)
// console.log(res)
return new IndexerResult(
const result = new IndexerResult(
false,
[],
[],
[],
await this.ds.searchDownstreamTxidsReadyToExecute(txid)
)
await this.execSet.remove(txid)
return result
} else {
return this.indexTransaction(txBuff, blockHeight)
}
Expand All @@ -67,6 +69,12 @@ class Indexer {
const time = new Date()
await this.ds.addNewTx(txid, time, blockHeight)

const result = await this._doIndexing(txid, txBuf)
await this.execSet.remove(txid)
return result
}

async _doIndexing (txid, txBuf) {
const indexed = await this.ds.txIsIndexed(txid)
if (indexed) {
return new IndexerResult(true, [], [], [], await this.ds.searchDownstreamTxidsReadyToExecute(txid))
Expand Down Expand Up @@ -234,7 +242,6 @@ class Indexer {
await this.ds.performOnTransaction(async (ds) => {
await ds.setExecutedForTx(txid, 1)
await ds.setIndexedForTx(txid, 1)
await ds.removeTxFromExecuting(txid)

for (const key of Object.keys(cache)) {
if (key.startsWith('jig://')) {
Expand All @@ -251,6 +258,8 @@ class Indexer {
)
} else if (key.startsWith('berry://')) {
const location = key.slice('berry://'.length)
const klass = classes.find(([loc]) => loc === location)
await ds.setBerryMetadata(location, klass && klass[1])
await this.blobs.pushJigState(location, cache[key])
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/worker/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ async function execute ({ txid, hex, trustList }) {
await tx.cache()

const cache = run.cache.newStates
const jigs = tx.outputs.filter(creation => creation instanceof Run.Jig)
const classes = jigs.map(jig => [jig.location, jig.constructor.origin])
// const jigs = tx.outputs.filter(creation => creation instanceof Run.Jig)
const jigs = await Promise.all(Object.keys(cache).map(key => key.split('://')[1]).map(location => run.load(location)))
const classes = jigs.map(jig => [jig.location.replace(/&hash=[a-fA-F0-9]*/, ''), jig.constructor.origin])
const creationsWithLocks = tx.outputs.filter(creation => creation.owner instanceof Run.api.Lock)
const customLocks = creationsWithLocks.map(creation => [creation.location, creation.owner])
const locks = customLocks.map(([location, lock]) => [location, lock.constructor.origin])
Expand Down
48 changes: 8 additions & 40 deletions test/crawler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ const { def, get } = require('bdd-lazy-var/getter')
const Indexer = require('../src/indexer')
const { DbTrustList } = require('../src/trust-list/db-trust-list')
const { Executor } = require('../src/execution/executor')
const { KnexDatasource } = require('../src/data-sources/knex-datasource')
const knex = require('knex')
const { KnexBlobStorage } = require('../src/data-sources/knex-blob-storage')
const { Crawler } = require('../src/crawler')
const { TestBlockchainApi } = require('../src/blockchain-api/test-blockchain-api')
const Run = require('run-sdk')
const { ExecutionManager } = require('../src/execution-manager')
const { MemoryQueue } = require('../src/queues/memory-queu')
const { ExecutionWorker } = require('../src/execution-worker')
const { ExecutingSet } = require('../src/executing-set')
const { buildBlobs, buildDs } = require('./test-env')

// ------------------------------------------------------------------------------------------------
// Globals
Expand All @@ -31,61 +30,30 @@ const logger = { info: () => {}, warn: () => {}, error: () => {}, debug: () => {
// ------------------------------------------------------------------------------------------------

describe('Crawler', () => {
def('ds', () => {
const knexInstance = knex({
client: 'sqlite3',
connection: {
filename: 'file:memDbMain?mode=memory&cache=shared',
flags: ['OPEN_URI', 'OPEN_SHAREDCACHE']
},
migrations: {
tableName: 'migrations',
directory: 'db-migrations'
},
useNullAsDefault: true
})

return new KnexDatasource(knexInstance, logger, false)
})
def('ds', () => buildDs())

def('trustList', () => {
return new DbTrustList()
})

def('blobs', () => {
const blobsKnex = knex({
client: 'sqlite3',
connection: {
filename: 'file:memDbBlobs?mode=memory&cache=shared',
flags: ['OPEN_URI', 'OPEN_SHAREDCACHE']
},
migrations: {
tableName: 'migrations',
directory: 'blobs-migrations'
},
useNullAsDefault: true
})

return new KnexBlobStorage(blobsKnex, {
serialize: JSON.stringify,
deserialize: JSON.parse
})
})
def('blobs', () => buildBlobs())

def('network', () => 'test')

def('executor', () => {
return new Executor(get.network, 1, get.blobs, get.ds, logger, {})
})

def('execSet', () => new ExecutingSet(get.ds))

def('indexer', () => {
return new Indexer(null, get.ds, get.blobs, get.trustList, get.executor, get.network, logger)
return new Indexer(get.ds, get.blobs, get.trustList, get.executor, get.network, get.execSet, logger)
})

def('execQueue', () => new MemoryQueue())
def('trustQueue', () => new MemoryQueue())

def('indexManager', () => new ExecutionManager(get.blobs, get.execQueue, get.trustQueue))
def('indexManager', () => new ExecutionManager(get.blobs, get.execQueue, get.trustQueue, get.execSet))

def('api', () => {
return new TestBlockchainApi()
Expand Down
Loading

0 comments on commit cc3cf66

Please sign in to comment.