Skip to content

Commit

Permalink
feat: for corda, attempt to write seals immediately in sealstream
Browse files Browse the repository at this point in the history
  • Loading branch information
mvayngrib committed Mar 6, 2018
1 parent 6b0182f commit ed44708
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/bot/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ export class Bot extends EventEmitter implements IReady {
object: resource,
merge: method === 'update'
})

// await this.bot.hooks.fire(`save:${method}`, resource)
} catch (err) {
this.logger.debug(`db.${method} failed`, {
type: resource[TYPE],
Expand Down
33 changes: 27 additions & 6 deletions src/bot/lambda/onsealstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ import { batchProcess } from '../../utils'
import { Lambda } from '../../types'
import { fromDynamoDB } from '../lambda'

const Watch = {
one: 'watchseal',
batch: 'watchseals'
}

const QueueWrite = {
one: 'queueseal',
batch: 'queueseals'
}

const Read = {
one: 'readseal',
batch: 'readseals'
Expand All @@ -14,6 +24,9 @@ const Write = {
batch: 'wroteseals'
}

const toBatchEvent = event => event + 's'
const pluckData = ({ data }) => data

export const createLambda = (opts) => {
const lambda = fromDynamoDB(opts)
return lambda.use(createMiddleware(lambda, opts))
Expand All @@ -24,12 +37,15 @@ export const createMiddleware = (lambda:Lambda, opts?:any) => {
const { batchSize=10 } = opts
const processBatch = async (records) => {
const events = records.map(recordToEvent)
const [read, wrote] = splitReadWrite(events)
const byType = _.groupBy(events, 'event')

// trigger batch processors
await Promise.all([
read.map(({ data }) => bot.hooks.fire(Read.batch, data)),
wrote.map(({ data }) => bot.hooks.fire(Write.batch, data))
])
await Promise.all(Object.keys(byType).map(async (event) => {
const subset = byType[event]
if (subset) {
await bot.hooks.fire(toBatchEvent(event), subset.map(pluckData))
}
}))

// trigger per-seal-event processors
await Promise.all(events.map(({ event, data }) => {
Expand All @@ -52,11 +68,16 @@ const recordToEvent = record => ({
const recordToEventType = record => {
// when a seal is queued for a write, unsealed is set to 'y'
// when a seal is written, unsealed is set to null
const wasJustSealed = (!record.old || record.old.unsealed) && !record.new.unsealed
const wasJustSealed = record.old && record.old.unsealed && !record.new.unsealed
if (wasJustSealed) return Write.one
if (record.new.unsealed) return QueueWrite.one

// do we care about distinguishing between # of confirmations
// in terms of the event type?
if (!record.old && record.new.unconfirmed && !record.new.unsealed) {
return Watch.one
}

return Read.one
}

Expand Down
10 changes: 10 additions & 0 deletions src/corda-seals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ class CordaSeals {
})
}

public writePendingSeal = (opts) => {
return this.seals.writePendingSeal({
...opts,
key: {
priv: PLACEHOLDER,
pub: PLACEHOLDER
}
})
}

public watch = promiseNoop
public watchNextVersion = promiseNoop
public syncUnconfirmed = opts => promiseNoop
Expand Down
11 changes: 9 additions & 2 deletions src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ import debug = require('debug')
import randomName = require('random-name')
import { allSettled, RESOLVED_PROMISE } from './utils'
import { randomString } from './crypto'
import { IDebug, ILambdaAWSExecutionContext, Lambda, IRequestContext, CloudName } from './types'
import {
IDebug,
ILambdaAWSExecutionContext,
Lambda,
IRequestContext,
CloudName,
IBlockchainIdentifier
} from './types'
import { WARMUP_SOURCE_NAME, ROOT_LOGGING_NAMESPACE } from './constants'
import Logger, { Level } from './logger'

Expand Down Expand Up @@ -58,7 +65,7 @@ export default class Env {
return `${this.SERVERLESS_SERVICE_NAME}-${this.STAGE}`
}

public BLOCKCHAIN:any
public BLOCKCHAIN: IBlockchainIdentifier
public CORDA_API_URL?:string
public CORDA_API_KEY?:string
public NO_TIME_TRAVEL:boolean
Expand Down
2 changes: 1 addition & 1 deletion src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export default function createEvents ({ logger, tables, dbUtils }) {
}

if (change.new.confirmations > 0) {
return 'seal:confirm'
return 'seal:confirmed'
}

return 'seal:read'
Expand Down
16 changes: 16 additions & 0 deletions src/in-house-bot/lambda/onsealstream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@

import { createBot } from '../../bot'
import { sendConfirmedSeals } from '../utils'

const bot = createBot()
const lambda = bot.lambdas.onsealstream()
const { logger, env } = lambda
if (env.BLOCKCHAIN.flavor === 'corda') {
bot.hook('queueseal', async (seal) => {
logger.debug('attempting to write seal immediately')
try {
const result = await bot.seals.writePendingSeal({ seal })
if (result) {
await sendConfirmedSeals(bot, [result])
}
} catch (err) {
logger.error('failed to write seal', err)
}
})
}

export const handler = lambda.handler
43 changes: 29 additions & 14 deletions src/seals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,30 +272,45 @@ export default class Seals {
const results = await seriesMap(pending, async (sealInfo: Seal) => {
if (aborted) return

const { link, address, counterparty } = sealInfo
const addresses = [address]
let result
try {
result = await this.blockchain.seal({ addresses, link, key, counterparty, balance })
} catch (error) {
if (Errors.matches(error, Errors.LowFunds)) {
return await this.writePendingSeal({ seal: sealInfo, key, balance })
} catch (err) {
Errors.rethrow(err, 'developer')
if (Errors.matches(err, Errors.LowFunds)) {
this.logger.error(`aborting, insufficient funds, send funds to ${key.fingerprint}`)
aborted = true
}

await this.recordWriteError({ seal: sealInfo, error })
return
}

return await this.recordWriteSuccess({
seal: sealInfo,
txId: result.txId
})
})

return results.filter(notNull)
}

public writePendingSeal = async ({ seal, key, balance }: {
seal: Seal
key?: any
balance?: number
}):Promise<Seal> => {
if (!key) {
key = await this.provider.getMyChainKey()
}

const { link, address, counterparty } = seal
const addresses = [address]
let result
try {
result = await this.blockchain.seal({ addresses, link, key, counterparty, balance })
} catch (error) {
await this.recordWriteError({ seal, error })
throw error
}

return await this.recordWriteSuccess({
seal,
txId: result.txId
})
}

private createSealRecord = async (opts:SealRecordOpts):Promise<void> => {
if (!opts.key && opts.write) {
opts = {
Expand Down
35 changes: 33 additions & 2 deletions src/test/bot.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,18 @@ test(`readseal`, loudAsync(async (t) => {
const link = '7f358ce8842a2a0a1689ea42003c651cd99c9a618d843a1a51442886e3779411'

let read
let queuedWrite
let wrote
let watch
const tradle = createTestTradle()
const { seals, provider } = tradle
const { getMyKeys } = provider
provider.getMyKeys = () => Promise.resolve(aliceKeys)

const bot = createBot({ tradle })
bot.hook('readseal', async (event) => {
read = true

bot.hook('queueseal', async (event) => {
queuedWrite = true
t.equal(event.link, link)
})

Expand All @@ -271,7 +274,25 @@ test(`readseal`, loudAsync(async (t) => {
t.equal(event.link, link)
})

bot.hook('readseal', async (event) => {
read = true
t.equal(event.link, link)
})

bot.hook('watchseal', async (event) => {
watch = true
t.equal(event.link, link)
})

await bot.lambdas.onsealstream().handler(toStreamItems([
// queueseal
{
new: {
link,
unsealed: 'x'
}
},
// wroteseal
{
old: {
link,
Expand All @@ -281,6 +302,7 @@ test(`readseal`, loudAsync(async (t) => {
link
}
},
// readseal
{
old: {
link,
Expand All @@ -289,13 +311,22 @@ test(`readseal`, loudAsync(async (t) => {
new: {
link
}
},
// watchseal
{
new: {
link,
unconfirmed: 'x'
}
}
]), {
done: t.error
} as ILambdaAWSExecutionContext)

t.equal(read, true)
t.equal(wrote, true)
t.equal(watch, true)
t.equal(queuedWrite, true)

provider.getMyKeys = getMyKeys
t.end()
Expand Down
5 changes: 5 additions & 0 deletions src/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,8 @@ export interface IGraphqlAPI {
execute: (query: string, variables?: any) => Promise<GraphqlExecutionResult>
graphiqlOptions: IGraphiqlOptions
}

export interface IBlockchainIdentifier {
flavor: string
networkName: string
}

0 comments on commit ed44708

Please sign in to comment.