Skip to content

Commit

Permalink
refactor: check balance before sealing
Browse files Browse the repository at this point in the history
  • Loading branch information
mvayngrib committed Feb 20, 2018
1 parent e2999ab commit e2475d2
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 37 deletions.
75 changes: 65 additions & 10 deletions src/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import {
Tradle
} from './types'

import Errors = require('./errors')

// interface IBlockchainIdentifier {
// flavor: string,
// networkName: string,
// minBalance: string
// }

type BalanceValue = string | number

interface IKey {
fingerprint: string
priv: string
Expand All @@ -25,6 +29,43 @@ interface ISealable {
basePubKey: any
}

interface ISealOpts {
key: IKey
link: string
addresses: string[]
balance?: BalanceValue
[x: string]: any
}

const compareNums = (a, b) => a < b ? -1 : a === b ? 0 : 1
const compareBalance = (a, b) => {
if (typeof a === 'number' && typeof b === 'number') {
return compareNums(a, b)
}

if (typeof a === 'number') {
a = a.toString(16)
}

if (typeof b === 'number') {
b = b.toString(16)
}

if (typeof a !== 'string' || typeof b !== 'string') {
throw new Error('expected numbers or hex strings')
}

const padLength = a.length - b.length
if (padLength > 0) {
b = '0'.repeat(padLength) + b
} else if (padLength > 0) {
a = '0'.repeat(padLength) + a
}

// can compare like nums
return compareNums(a, b)
}

export default class Blockchain {
public flavor: string
public networkName: string
Expand Down Expand Up @@ -145,18 +186,30 @@ export default class Blockchain {
// return getTxsForAddresses(addresses)
// })

public seal = async ({ key, link, addresses, counterparty }) => {
public seal = async ({ key, link, addresses, balance }: ISealOpts) => {
const writer = this.getWriter(key)
this.start()
this.logger.debug(`sealing ${link}`)
return await writer.send({
to: addresses.map(address => {
return {
address,
amount: this.getTxAmount()
}
if (typeof balance === 'undefined') {
balance = await this.balance()
}

const amount = this.getTxAmount()
if (compareBalance(balance, amount) === -1) {
throw new Errors.LowFunds(`have ${balance}, need at least ${amount}`)
}

try {
return await writer.send({
to: addresses.map(address => ({ address, amount }))
})
})
} catch (err) {
if (Errors.matches(err, { message: /insufficient/i })) {
throw new Errors.LowFunds(err.message)
}

throw err
}
}

public sealPubKey = (opts: ISealable) => {
Expand Down Expand Up @@ -213,13 +266,15 @@ export default class Blockchain {

public balance = async (opts: {
address?: string
}={}) => {
}={}):Promise<BalanceValue> => {
let { address } = opts
if (!address) {
address = await this.getMyChainAddress()
}

return this.addressesAPI.balance(address)
const balance = await this.addressesAPI.balance(address)
this.logger.debug(`balance: ${balance}`)
return balance
}
}

Expand Down
1 change: 1 addition & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ const errors = {
Exists: createError('Exists'),
HttpError,
Timeout: createError('Timeout'),
LowFunds: createError('LowFunds'),
export: (err:Error):any => {
if (err instanceof ExportableError) {
return (err as ExportableError).toJSON()
Expand Down
57 changes: 37 additions & 20 deletions src/seals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const WATCH_TYPE = {
const RESEAL_ENABLED = false
const DEFAULT_WRITE_GRACE_PERIOD = 6 * 3600 * 1000
const TIMESTAMP_MULTIPLIER = 1e3 // milli -> micro
const acceptAll = val => true

type SealRecordOpts = {
key?: IECMiniPubKey
Expand Down Expand Up @@ -84,6 +85,7 @@ type Seal = {
txId?: string
// nanoseconds
timeSealed?: number
canceledWrite?: string
}

type SealMap = {
Expand Down Expand Up @@ -242,13 +244,9 @@ export default class Seals {

const {
blockchain,
provider,
getUnsealed,
recordWriteSuccess,
recordWriteError
provider
} = this


let { limit=Infinity, key } = opts
if (!key) {
key = await provider.getMyChainKey()
Expand All @@ -257,16 +255,18 @@ export default class Seals {
const pending = await this.getUnsealed({ limit })
this.logger.info(`found ${pending.length} pending seals`)
let aborted
// TODO: update balance after every tx
const balance = this.blockchain.balance ? await this.blockchain.balance() : undefined
const results = await seriesMap(pending, async (sealInfo: Seal) => {
if (aborted) return

const { link, address, counterparty } = sealInfo
const addresses = [address]
let result
try {
result = await this.blockchain.seal({ addresses, link, key, counterparty })
result = await this.blockchain.seal({ addresses, link, key, counterparty, balance })
} catch (error) {
if (/insufficient/i.test(error.message)) {
if (Errors.matches(error, Errors.LowFunds)) {
this.logger.error(`aborting, insufficient funds, send funds to ${key.fingerprint}`)
aborted = true
}
Expand Down Expand Up @@ -408,19 +408,38 @@ export default class Seals {
await this._requeueWrites(unconfirmed)
}

public cancelPending = async (opts?:any):Promise<Seal[]> => {
let { limit=Infinity, filter=acceptAll } = opts
let seals = await this.getUnsealed({ limit })
if (!seals.length) return

seals = seals.filter(filter)
if (!seals.length) return

this.logger.debug('canceling writes', seals.map(seal => _.pick(seal, ['blockchain', 'network', 'address', 'link'])))

const now = timestamp()
const puts = seals.map(seal => ({
...seal,
canceledWrite: String(now),
unsealed: null
}))

await this.table.batchPut(puts)
return seals
}

private _requeueWrites = async (seals:Seal[]):Promise<Seal[]> => {
if (!seals.length) return

this.logger.debug('failed writes', seals.map(seal => _.pick(seal, ['timeSealed', 'txId'])))

const now = timestamp()
const puts = seals.map(seal => {
return {
..._.omit(seal, ['unconfirmed', 'txId']),
unsealed: String(now),
txId: null
}
})
const puts = seals.map(seal => ({
..._.omit(seal, ['unconfirmed']),
unsealed: String(now),
txId: null
}))

await this.table.batchPut(puts)
return seals
Expand All @@ -432,12 +451,10 @@ export default class Seals {
this.logger.debug('failed reads', seals.map(seal => _.pick(seal, ['address', 'link'])))

const now = timestamp()
const puts = seals.map(seal => {
return {
..._.omit(seal, 'unconfirmed'),
unwatched: String(now)
}
})
const puts = seals.map(seal => ({
..._.omit(seal, 'unconfirmed'),
unwatched: String(now)
}))

await this.table.batchPut(puts)
return seals
Expand Down
17 changes: 10 additions & 7 deletions src/test/seals.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { recreateTable } from './utils'
import Tradle from '../tradle'
import { Env } from '../env'
import Errors = require('../errors')
import { loudAsync } from '../utils'
const aliceKeys = require('./fixtures/alice/keys')
const bobKeys = require('./fixtures/bob/keys')
const aliceIdentity = require('./fixtures/alice/identity')
Expand All @@ -41,7 +42,7 @@ const rejectEtherscanCalls = () => {

rejectEtherscanCalls()

test('handle failed reads/writes', async (t) => {
test('handle failed reads/writes', loudAsync(async (t) => {
const { flavor, networkName } = blockchainOpts
const env = new Env(process.env)
env.BLOCKCHAIN = blockchainOpts
Expand All @@ -65,8 +66,8 @@ test('handle failed reads/writes', async (t) => {
t.equal(failedReads.length, 1)

const stubSeal = sinon.stub(seals.blockchain, 'seal').resolves({ txId: 'sometxid' })
const stubBalance = sinon.stub(seals.blockchain, 'balance').resolves('aabbccddeeff')
await seals.sealPending()

let failedWrites = await seals.getFailedWrites({ gracePeriod: 1 }) // 1ms
t.equal(failedWrites.length, 1)

Expand Down Expand Up @@ -97,9 +98,9 @@ test('handle failed reads/writes', async (t) => {
t.equal(stubSeal.callCount, 2)

t.end()
})
}))

test('queue seal', async (t) => {
test('queue seal', loudAsync(async (t) => {
const env = new Env(process.env)
env.BLOCKCHAIN = blockchainOpts

Expand All @@ -126,6 +127,8 @@ test('queue seal', async (t) => {
return { txId }
})

const stubBalance = sinon.stub(seals.blockchain, 'balance').resolves('aabbccddeeff')

const stubGetTxs = sinon.stub(blockchain, 'getTxsForAddresses')
.callsFake(function (addresses, blockHeight) {
return Promise.resolve([
Expand Down Expand Up @@ -211,9 +214,9 @@ test('queue seal', async (t) => {
stubObjectsGet.restore()
stubDBUpdate.restore()
t.end()
})
}))

test('corda seals', async (t) => {
test('corda seals', loudAsync(async (t) => {
const table = await recreateTable(SealsTableLogicalId)
const env = new Env(process.env)
const blockchainOpts = env.BLOCKCHAIN = {
Expand Down Expand Up @@ -312,4 +315,4 @@ test('corda seals', async (t) => {

t.same(_.pick(saved, Object.keys(expected)), expected)
t.end()
})
}))

0 comments on commit e2475d2

Please sign in to comment.