Skip to content

Commit

Permalink
fixes and fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
hojarasca committed Jun 23, 2022
1 parent cc3cf66 commit 340d58c
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 75 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ services:
PORT: 3000
MAIN_DB_CONNECTION_URI: postgres://someuser:sosecret@psql:5432/rundb_regtest
BLOB_DB_CONNECTION_URI: postgres://someuser:sosecret@psql:5432/blobs_regtest
BITCOIND_REST_URL: http://bitcoind:18332/rest
RABBITMQ_URI: amqp://guest:guest@rabbitmq:5672

psql:
Expand Down
55 changes: 27 additions & 28 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"stream-json": "^1.7.4"
},
"optionalDependencies": {
"zeromq": "^5.2.8"
"zeromq": "^6.0.0-beta.6"
},
"devDependencies": {
"@runonbitcoin/nimble": "^1.0.13",
Expand Down
4 changes: 3 additions & 1 deletion src/bin/start-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const {
RabbitQueue,
buildMainServer
} = require('../index')
const { ExecutingSet } = require('../executing-set')

// ------------------------------------------------------------------------------------------------
// Globals
Expand Down Expand Up @@ -69,7 +70,8 @@ async function main () {
await rabbitChannel.prefetch(20)
execQueue = new RabbitQueue(rabbitChannel, 'exectx')
trustQueue = new RabbitQueue(rabbitChannel, 'trusttx')
indexManager = new ExecutionManager(blobs, execQueue, trustQueue)
const execSet = new ExecutingSet(ds)
indexManager = new ExecutionManager(blobs, execQueue, trustQueue, execSet)
server = buildMainServer(ds, blobs, indexManager, logger)
await execQueue.setUp()
await trustQueue.setUp()
Expand Down
10 changes: 9 additions & 1 deletion src/bin/start-crawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {
} = require('../index')

const path = require('path')
const { ExecutingSet } = require('../executing-set')

// const { Crawler, BitcoinNodeConnection, BitcoinZmq, BitcoinRpc } = require('../index')
// const { KnexBlobStorage } = require('../data-sources/knex-blob-storage')
Expand Down Expand Up @@ -86,10 +87,12 @@ const knexBlob = knex({
})
const blobs = new KnexBlobStorage(knexBlob, undefined, api)
const ds = new KnexDatasource(knexInstance)
const execSet = new ExecutingSet(ds)

let indexManager
let crawler = null
let execQueue = null
let trustQueue = null
let rabbitConnection = null

// ------------------------------------------------------------------------------------------------
Expand All @@ -101,8 +104,10 @@ async function main () {
const rabbitChannel = await rabbitConnection.createChannel()
await rabbitChannel.prefetch(20)
execQueue = new RabbitQueue(rabbitChannel, 'exectx')
trustQueue = new RabbitQueue(rabbitChannel, 'trusttx')
await execQueue.setUp()
indexManager = new ExecutionManager(blobs, execQueue)
await trustQueue.setUp()
indexManager = new ExecutionManager(blobs, execQueue, trustQueue, execSet)
await indexManager.setUp()
crawler = new Crawler(indexManager, api, ds, logger, { initialBlockConcurrency: INITIAL_BLOCK_CONCURRENCY })
await ds.setUp()
Expand All @@ -128,6 +133,9 @@ async function shutdown () {
if (execQueue !== null) {
await execQueue.tearDown()
}
if (trustQueue !== null) {
await trustQueue.tearDown()
}
if (rabbitConnection !== null) {
await rabbitConnection.close()
}
Expand Down
7 changes: 4 additions & 3 deletions src/bin/start-execution-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ const {
knex,
KnexBlobStorage,
TrustAllTrustList,
Executor,
RabbitQueue,
ExecutionWorker
ExecutionWorker,
execution: { Executor }
} = require('../index')
const { ExecutingSet } = require('../executing-set')

Expand Down Expand Up @@ -77,7 +77,8 @@ async function main () {
await rabbitChannel.prefetch(1)
execQueue = new RabbitQueue(rabbitChannel, 'exectx')
trustQueue = new RabbitQueue(rabbitChannel, 'trusttx')
worker = new ExecutionWorker(indexer, execQueue, trustQueue)
const execSet = new ExecutingSet(ds)
worker = new ExecutionWorker(indexer, execSet, execQueue, trustQueue)

await execQueue.setUp()
await trustQueue.setUp()
Expand Down
5 changes: 4 additions & 1 deletion src/blockchain-api/bitcoin-node-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class BitcoinNodeConnection extends BlockchainApi {

async fetch (txid) {
const response = await fetch(`${this.url}/tx/${txid}.bin`)
if (response.status !== 200) {
throw new Error('error fetching tx')
}
return response.buffer()
}

Expand All @@ -44,7 +47,7 @@ class BitcoinNodeConnection extends BlockchainApi {
}

async onMempoolTx (fn) {
this.zmq.subscribe('rawtx', fn)
this.zmq.subscribe('hashtx', fn)
}

async onNewBlock (fn) {
Expand Down
11 changes: 6 additions & 5 deletions src/blockchain-api/bitcoin-zmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ try {

class BitcoinZmq {
constructor (url) {
this.sock = zmq.socket('sub')
this.sock = new zmq.Subscriber()
this.url = url
this.handlers = new Map()
}

async connect () {
this.sock.connect(this.url)
this.sock.on('message', (topic, message) => {
await this.sock.connect(this.url)
for await (const [topic, msg] of this.sock) {
const txid = msg.toString('hex')
const handler = this.handlers.get(topic.toString())
if (handler) {
handler(message)
handler(txid)
}
})
}
}

async subscribe (topic, handler) {
Expand Down
3 changes: 2 additions & 1 deletion src/crawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class Crawler {
await this.api.setUp()
}

async _receiveTransaction (rawTx, blockHeight = null) {
async _receiveTransaction (txid, blockHeight = null) {
const rawTx = await this.api.fetch(txid)
return this.execManager.indexTxNow(rawTx, blockHeight)
}

Expand Down
4 changes: 3 additions & 1 deletion src/data-sources/knex-datasource.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ class KnexDatasource {
return !!(result && result.indexed)
}

async searchNonExecutedTxs () {
async searchNonExecutedTxs (limit) {
return this.knex(TX.NAME)
.where(TX.executed, false)
.orderBy(TX.time, 'ASC')
.limit(limit)
.pluck(TX.txid)
}

Expand Down
3 changes: 1 addition & 2 deletions src/execution-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ExecutionManager {
const txid = await this.blobs.pushTx(null, txBuff)
const rQueue = await this._execReplyQueue()
this.executingSet.add(txid)
const result = await rQueue.publishAndAwaitResponse({ txid })
const result = await rQueue.publishAndAwaitResponse({ txid, blockHeight: blockHeight })
this.executingSet.remove(txid)
return result
}
Expand All @@ -31,7 +31,6 @@ class ExecutionManager {
// }

async trustTxLater (txid, trust) {
if (this.executingSet.check(txid)) { return }
this.executingSet.add(txid)
await this.trustQueue.publish({ txid, trust })
}
Expand Down
22 changes: 12 additions & 10 deletions src/execution-worker.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
class ExecutionWorker {
constructor (indexer, execQueue, trustQueue) {
constructor (indexer, execSet, execQueue, trustQueue) {
this.indexer = indexer
this.execQueue = execQueue
this.trustQueue = trustQueue
this.execSet = execSet
this.execSubscription = null
this.trustSubscription = null
}

async setUp () {
this.execSubscription = await this.execQueue.subscribe(async ({ txid, blockHeight }) => {
// console.log('starting: ', txid)
try {
const result = await this.indexer.indexTxid(txid, blockHeight)
await this._handleIndexResult(result)
Expand Down Expand Up @@ -40,16 +40,18 @@ class ExecutionWorker {
}

async _handleIndexResult (result) {
const enableProms = result.enables.map(async txid => {
return this.execQueue.publish({ txid })
})
const unknownDepsProms = result.unknownDeps.map(async txid => {
return this.execQueue.publish({ txid })
})
const missingDeps = result.missingDeps.map(async txid => {
const newTxidsToIndex = await Promise.all([...result.unknownDeps, ...result.missingDeps].map(async txid => {
return this.execSet.check(txid) ? null : txid // We only want the ones that are not there.
}))
.then(list => list.filter(txid => txid))
.then(list => [...list, ...result.enables]) // the enable ones are always included to avoid race conditions.

const promises = newTxidsToIndex.map(async txid => {
await this.execSet.add(txid)
return this.execQueue.publish({ txid })
})
await Promise.all([...enableProms, ...unknownDepsProms, ...missingDeps])

await Promise.all(promises)
}
}

Expand Down
1 change: 0 additions & 1 deletion src/execution/api-executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class ApiExecutor {

async execute (txid) {
if (this.executing.has(txid)) return
this.logger.debug('Enqueueing', txid, 'for execution')
const token = await this.pool.acquire()
try {
this.executing.add(txid)
Expand Down
8 changes: 1 addition & 7 deletions src/execution/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ class Executor {

async execute (txid, trustList) {
if (this.executing.has(txid)) return new ExecutionResult(false, [], null)

this.logger.debug('Enqueueing', txid, 'for execution')

const worker = await this.pool.acquire()
const txBuf = await this.blobs.pullTx(txid, () => null)

Expand All @@ -79,10 +76,7 @@ class Executor {
this.executing.add(txid)
try {
const result = await worker.send('execute', { txid, hex, trustList })
if (result.missingDeps) {
return new ExecutionResult(false, result.missingDeps, null, null)
}
return new ExecutionResult(true, [], result)
return new ExecutionResult(result.success, result.missingDeps, result, result.error)
} catch (e) {
return new ExecutionResult(false, [], null, e)
} finally {
Expand Down
Loading

0 comments on commit 340d58c

Please sign in to comment.