Skip to content
Permalink
Browse files

Merge pull request #7 from khaihkd/master

 add config crawl from block to block && fix bug finish process
  • Loading branch information...
khaihkd committed Jul 25, 2018
2 parents 52bd9f6 + 212ca19 commit a0321b0c206af98c9cf201ecb03e9d3b6ac8de0d
@@ -10,8 +10,10 @@
},
"provider": {
"ws": "wss://mainnet.infura.io/ws",
"http": "https://testnet.tomochain.com"
"http": "https://mainnet.infura.io"
},
"tomoAddress": "0x8b353021189375591723e7384262f45709a3c3dc",
"fromBlock": 5168958,
"toBlock": 5995800,
"processNumber": 6
}
@@ -1,24 +1,32 @@
'use strict'

const config = require('config')
const db = require('./models/mongodb')
const q = require('./queues')
const web3 = require('./models/blockchain/chain')

let sleep = (time) => new Promise((resolve) => setTimeout(resolve, time))

async function crawlProcess() {
for (let i = 5168958; i < 5995800 ; i++) {
if (i !== 5168958 && i !== 5169011 && i !== 5169173 && i < 5175169) {
continue
}
if (i % 10 === 0) {
console.log('Sleep 120 seconds')
await sleep(120000)
for (let i = config.get('fromBlock'); i < config.get('toBlock'); i++) {

/*
TODO: Remove this check.
It is only for Tomocoin. We break some first block because there is no transaction about Tomocoin event transfer
*/
if (config.get('tomoAddress') === '0x8b353021189375591723e7384262f45709a3c3dc'){
if (i !== 5168958 && i !== 5169011 && i !== 5169173 && i < 5175169) {
continue
}
}
// if (i % 10 === 0) {
// console.log('Sleep 120 seconds')
// await sleep(120000)
// }

let block = await web3.eth.getBlock(i);
let b = await db.Block.findOne({blockNumber: i})
if (b && b.isProcess) {
if (b && b.isFinish) {
continue
}
await db.Block.findOneAndUpdate({blockNumber: block.number}, {
@@ -35,7 +43,6 @@ async function crawlProcess() {
await q.create('newTransaction', {transactions: listTransactions.toString(), blockNumber: block.number})
.attempts(5).backoff({delay: 10000})
.priority('low').removeOnComplete(true).save()
await db.Block.findOneAndUpdate({blockNumber: i}, {isProcess: true}, { upsert: true, new: true })
}
}
}
@@ -24,7 +24,7 @@ const Block = new Schema({
timestamp: {
type: Number
},
isProcess: {
isFinish: {
type: Boolean
}
}, { timestamps: false })
@@ -23,7 +23,10 @@ const Transaction = new Schema({
tokenAmount: {
type: Number
},
isProcess: {
isAddToken: {
type: Boolean
},
isSubToken: {
type: Boolean
}
}, { timestamps: false })
@@ -5,8 +5,10 @@ const consumer = {}
consumer.name = 'addAmountToWallet'

consumer.task = async function(job, done) {
let fromWallet = job.data.fromWallet
let toWallet = job.data.toWallet
let tokenAmount = job.data.tokenAmount
let transactionHash = job.data.transactionHash
console.log(' - add ', tokenAmount, 'TOMO to wallet: ', toWallet)
let wallet = await db.Wallet.findOneAndUpdate({address: toWallet}, {address: toWallet}, { upsert: true, new: true })
if (wallet.balance) {
@@ -22,6 +24,11 @@ consumer.task = async function(job, done) {

wallet.save()

await db.Transaction.findOneAndUpdate(
{hash: transactionHash, fromAccount: fromWallet, toAccount: toWallet},
{isAddToken: true}, { upsert: true, new: true }
)

done();

};
@@ -19,7 +19,12 @@ fs.readdirSync(__dirname)
})
.forEach(function (file) {
let consumer = require(path.join(__dirname, file));
q.process(consumer.name, consumer.task);
if (consumer.name === 'newTransaction') {
q.process(consumer.name, config.get('processNumber'), consumer.task);

} else {
q.process(consumer.name, consumer.task);
}
});

module.exports = q;
@@ -6,7 +6,9 @@ consumer.name = 'subAmountFromWallet'

consumer.task = async function(job, done) {
let fromWallet = job.data.fromWallet
let toWallet = job.data.toWallet
let tokenAmount = job.data.tokenAmount
let transactionHash = job.data.transactionHash
console.log(' - sub ', tokenAmount, 'TOMO from wallet: ', fromWallet)
let wallet = await db.Wallet.findOneAndUpdate({address: fromWallet}, {address: fromWallet}, { upsert: true, new: true })
if (wallet.balance) {
@@ -21,6 +23,11 @@ consumer.task = async function(job, done) {
}
wallet.save()

await db.Transaction.findOneAndUpdate(
{hash: transactionHash, fromAccount: fromWallet, toAccount: toWallet},
{isSubToken: true}, { upsert: true, new: true }
)

done();
};

@@ -33,7 +33,7 @@ consumer.task = async function(job, done) {
console.log(' - Found Transfer', tokenAmount, ' TOMO from: ', fromWallet, ' to: ', toWallet)

let tran = await db.Transaction.findOne({hash: transaction.transactionHash, fromAccount: fromWallet})
if (tran && tran.isProcess) {
if (tran && tran.isAddToken && tran.isSubToken) {
continue
}

@@ -61,20 +61,14 @@ consumer.task = async function(job, done) {
.attempts(5).backoff({delay: 10000})
.priority('critical').removeOnComplete(true).save()

await db.Transaction.findOneAndUpdate(
{hash: transaction.transactionHash, fromAccount: fromWallet, toAccount: toWallet},
{isProcess: true}, { upsert: true, new: true }
)

}
}

}
}
console.log('Finish process block: ', blockNumber)



await db.Block.findOneAndUpdate({blockNumber: blockNumber}, {isFinish: true}, { upsert: true, new: true })
done()
};

0 comments on commit a0321b0

Please sign in to comment.
You can’t perform that action at this time.