Skip to content
Permalink
Browse files

Fixed capability tracking. Fixed some debug messaging (INFURA#7)

* Fixed capability tracking. Fixed some debug messaging

* Removed force:true

* Added error tracking and a debug command

* Fixed an issue with country; code reformat; increased refresh interval
  • Loading branch information...
kshinn authored and egalano committed Feb 2, 2019
1 parent 2873af4 commit 02c8cf595e9ea37d3b4bfd8b7b17765ba6a5df67
Showing with 4,567 additions and 58 deletions.
  1. +83 −54 index.js
  2. +35 −3 lib/db.js
  3. +3 −1 package.json
  4. +4,446 −0 yarn.lock
137 index.js
@@ -4,18 +4,20 @@ const Buffer = require('safe-buffer').Buffer
const { randomBytes } = require('crypto')
const devp2p = require('ethereumjs-devp2p')
const geoip = require('geoip-lite')
const debug = require('debug')('infura:dpt')
const debug = require('debug')('infura:main')
const Web3 = require('web3')
const web3 = new Web3()
const _ = require('lodash')
const db = require('./lib/db')()
const Common = require('ethereumjs-common')
const c = new Common('mainnet')
const web3Url = `https://mainnet.infura.io/v3/${process.env.INFURA_ID}`
web3.setProvider(new web3.providers.HttpProvider(web3Url))

const EthPeer = db.EthPeer
const { EthPeer, PeerErr } = db

const PRIVATE_KEY = randomBytes(32)
const BOOTNODES = require('ethereum-common').bootstrapNodes.map((node) => {
const BOOTNODES = c.bootstrapNodes().map((node) => {
return {
address: node.ip,
udpPort: node.port,
@@ -33,13 +35,16 @@ const myStatus = {
genesisHash: GENESIS_HASH,
bestHash: GENESIS_HASH
}

const dpt = new devp2p.DPT(Buffer.from(PRIVATE_KEY, 'hex'), {
endpoint: {
address: '0.0.0.0',
udpPort: null,
tcpPort: null
}
},
refreshInterval: 20000 // refresh every 20s
})

// RLPx
const rlpx = new devp2p.RLPx(PRIVATE_KEY, {
dpt: dpt,
@@ -56,66 +61,89 @@ const rlpx = new devp2p.RLPx(PRIVATE_KEY, {
rlpx.on('error', (err) => console.error(chalk.red(`RLPx error: ${err.stack || err}`)))

rlpx.on('peer:added', (peer) => {
let hello = peer.getHelloMessage()
let peerGeo = geoip.lookup(peer._socket.remoteAddress)
let b = new EthPeer({
address: peer._socket.remoteAddress,
capabilitites: hello.capabilities,
let hello = peer.getHelloMessage()
let capabilityStr = _.map(hello.capabilities, (cap) => { return `${cap.name}.${cap.version}` })
let { remoteAddress, remotePort } = peer._socket
let splitClientId = hello.clientId.split('/')
let peerGeo = geoip.lookup(remoteAddress)

peer.on('error', (err) => {
debug(`${remoteAddress}:${remotePort} (Peer Error) ${err}`)
PeerErr.create({
address: remoteAddress,
capabilities: capabilityStr,
clientId: hello.clientId,
country: _.has(peerGeo, 'country') ? peerGeo.country : null,
enode: hello.id.toString('hex'),
port: peer._socket.remotePort,
timestamp: new Date(),
error: err.message,
clientMeta1: splitClientId[0],
clientMeta2: splitClientId[1],
clientMeta3: splitClientId[2],
clientMeta4: splitClientId[3],
latitude: peerGeo.ll[0],
longitude: peerGeo.ll[1]
}).then(() => {
debug('Saved peer error')
}).catch((err) => {
debug(`Error saving peerErr: ${err}`)
})
})

debug(`${remoteAddress}: ${capabilityStr}`)
let b = EthPeer.build({
address: remoteAddress,
capabilities: capabilityStr,
clientId: hello.clientId,
enode: hello.id.toString('hex'),
port: peer._socket.remotePort,
timestamp: new Date(),
country: _.has(peerGeo, 'country') ? peerGeo.country : null,
city: _.has(peerGeo, 'city') ? peerGeo.city : null,
latitude: peerGeo.ll[0],
longitude: peerGeo.ll[1],
clientMeta1: splitClientId[0],
clientMeta2: splitClientId[1],
clientMeta3: splitClientId[2],
clientMeta4: splitClientId[3]
})

if (_.has(peerGeo, 'll.country')) {
b.country = peerGeo.ll.country
}
if (_.has(peerGeo, 'city')) {
b.city = peerGeo.city
}
b.latitude = peerGeo.ll[0]
b.longitude = peerGeo.ll[1]

var splitClientId = hello.clientId.split('/')
b.clientMeta1 = splitClientId[0]
b.clientMeta2 = splitClientId[1]
b.clientMeta3 = splitClientId[2]
b.clientMeta4 = splitClientId[3]

const eth = peer.getProtocols()[0]
eth.sendStatus(myStatus)
eth.once('status', (peerStatus) => {
debug(`Received status ${peer._socket.remoteAddress}`)
b.bestHash = '0x' + peerStatus.bestHash.toString('hex')
b.totalDifficulty = peerStatus.td.toString('hex')
web3.eth.getBlock(b.bestHash, false)
.then((block) => {
if (_.has(block, 'number')) {
debug(`Received: ${block.number}`)
b.bestBlockNumber = block.number
}
return web3.eth.getBlockNumber()
})
.then((infuraBlockNumber) => {
debug(`Infura Block: ${infuraBlockNumber}`)
b.infuraBlockNumber = infuraBlockNumber
b.infuraDrift = Math.abs(b.infuraBlockNumber - b.bestBlockNumber) || 0
debug(`Found Drift: ${b.infuraDrift}`)
// db.on('error', console.error.bind(console, 'connection error:'))
b.save().then((ethpeer) => {
debug('Saved peer: ' + ethpeer.enode)
})
})
.catch(function (err) {
console.error(err)
const eth = peer.getProtocols()[0]
eth.sendStatus(myStatus)

eth.once('status', (peerStatus) => {
debug(`${remoteAddress}: Received status`)
b.bestHash = '0x' + peerStatus.bestHash.toString('hex')
b.totalDifficulty = peerStatus.td.toString('hex')
web3.eth.getBlock(b.bestHash, false)
.then((block) => {
if (_.has(block, 'number')) {
debug(`Received: ${block.number}`)
b.bestBlockNumber = block.number
}
return web3.eth.getBlockNumber()
})
.then((infuraBlockNumber) => {
debug(`Infura Block: ${infuraBlockNumber}`)
b.infuraBlockNumber = infuraBlockNumber
b.infuraDrift = Math.abs(b.infuraBlockNumber - b.bestBlockNumber) || 0
debug(`Found Drift: ${b.infuraDrift}`)
// db.on('error', console.error.bind(console, 'connection error:'))
b.save().then((ethpeer) => {
debug(`${remoteAddress}: Saved peer ${ethpeer.enode}`)
})
})
})
.catch(function (err) {
console.error(err)
})
})
})

rlpx.on('peer:removed', (peer) => {
// console.log(peer.getDisconnectPrefix(peer._disconnectReason))
const eth = peer.getProtocols()[0]
eth.sendStatus(myStatus)
debug(peer._socket.remoteAddress, peer.getDisconnectPrefix(peer._disconnectReason))
// const eth = peer.getProtocols()[0]
// eth.sendStatus(myStatus)
})

for (let bootnode of BOOTNODES) {
@@ -128,6 +156,7 @@ dpt.on('error', (err) => console.error(chalk.red(err.stack || err)))
dpt.on('peer:added', (peer) => {
// const info = `(${peer.id.toString('hex')},${peer.address},${peer.udpPort},${peer.tcpPort})`
// console.log(chalk.green(`New peer: ${info} (total: ${dpt.getPeers().length})`))
debug(`DHT peer count: ${dpt.getPeers().length}`)
})

dpt.on('peer:removed', (peer) => {
@@ -24,6 +24,22 @@ const EthPeerSchema = {
totalDifficulty: {type: Sequelize.STRING},
}

const PeerErrorSchema = {
address: { type: Sequelize.STRING, primary_key: true },
capabilities: { type: Sequelize.ARRAY(Sequelize.STRING) },
clientId:{type: Sequelize.STRING, index: true},
clientMeta1:{type: Sequelize.STRING, index: true},
clientMeta2:{type: Sequelize.STRING, index: true},
clientMeta3:{type: Sequelize.STRING, index: true},
clientMeta4:{type: Sequelize.STRING, index: true},
country: {type: Sequelize.STRING },
enode: {type: Sequelize.STRING, unique: true},
latitude: {type: Sequelize.FLOAT },
longitude: {type: Sequelize.FLOAT },
error: {type: Sequelize.STRING},
timestamp:{type: Sequelize.DATE, index: true},
}

module.exports = (opts) => {
const options = _.defaults(opts || {}, {
dbUser: 'pguser',
@@ -70,7 +86,8 @@ module.exports = (opts) => {
fields: ['infuraDrift'],
method: 'BTREE'
},
{ fields: ['timestamp'],
{
fields: ['timestamp'],
method: 'BTREE'
}
]
@@ -79,6 +96,21 @@ module.exports = (opts) => {
EthPeer.sync().then(() => {
debug('EthPeers table created')
})
const getConnection = () => { return conn }
return { EthPeer, getConnection }

const PeerErr = conn.define('peer_err', PeerErrorSchema, {
indexes: [
{
fields: ['timestamp'],
method: 'BTREE'
}
]
})

PeerErr.sync().then(() => {
debug('PeerErr table created')
})

const getConnection = () => { return conn }

return { EthPeer, PeerErr, getConnection }
}
@@ -10,6 +10,7 @@
"ethereum-common": "^0.2.1",
"ethereum-tx-decoder": "^1.0.2",
"ethereumjs-block": "^1.7.1",
"ethereumjs-common": "^0.6.1",
"ethereumjs-devp2p": "^2.5.1",
"ethereumjs-tx": "^1.3.7",
"geoip-lite": "^1.3.5",
@@ -42,7 +43,8 @@
},
"scripts": {
"start": "INFURA_ID=09bcdf3656ee4a80b05a4a921358d109 node index.js",
"test": "echo \"Error: no test specified\" && exit 1"
"test": "echo \"Error: no test specified\" && exit 1",
"debug": "DEBUG=devp2p:rplx:*,devp2p:dlt*,infura:* INFURA_ID=09bcdf3656ee4a80b05a4a921358d109 node --inspect index.js"
},
"author": {
"name": "E.G. Galano",

0 comments on commit 02c8cf5

Please sign in to comment.
You can’t perform that action at this time.