Skip to content

Commit

Permalink
feat(billing): add consumer list cache to UCAN stream consumer (#406)
Browse files Browse the repository at this point in the history
This adds an LRU cache to the UCAN stream consumer for billing to allow
us to process data for spaces with frequent writes a lot quicker (will
also reduce our dynamo reads bill a little).

Previously we filtered lambda invocations items to only process receipts
(instead of also invocations, which we simply dropped). This was working
for 3 days but now iterator age has started increasing again.

<img width="412" alt="Screenshot 2024-07-15 at 11 26 53"
src="https://github.com/user-attachments/assets/6e5ec767-c874-4dad-b3de-ba7cceee3c9c">

The lambda simply reads from the consumer table and writes to the space
diff table. The data from the consumer table almost never changes (it
actually cannot right now - we haven't built this feature yet) so an
in-memory cache should speed up execution for spaces with high write
frequency. We're currently seeing runtimes of ~250ms (when we actually
need to do some work - i.e. a lot of lambda invocations are not doing
any space diff accounting because they are for receipts that are not for
`store/add` or `blob/add`):

<img width="391" alt="Screenshot 2024-07-15 at 11 29 58"
src="https://github.com/user-attachments/assets/ce737752-de5d-4541-bacc-7620a349c8ce">
  • Loading branch information
alanshaw committed Jul 15, 2024
1 parent b7cf64b commit b350e5c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
43 changes: 39 additions & 4 deletions billing/functions/ucan-stream.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as Sentry from '@sentry/serverless'
import { toString, fromString } from 'uint8arrays'
import * as Link from 'multiformats/link'
import { LRUCache } from 'lru-cache'
import { createSpaceDiffStore } from '../tables/space-diff.js'
import { createConsumerStore } from '../tables/consumer.js'
import { expect } from './lib.js'
Expand Down Expand Up @@ -45,10 +46,9 @@ export const handler = Sentry.AWSLambda.wrapHandler(
}
console.log("Storing space usage delta", deltas[0])

const ctx = {
spaceDiffStore: createSpaceDiffStore({ region }, { tableName: spaceDiffTable }),
consumerStore: createConsumerStore({ region }, { tableName: consumerTable })
}
const consumerStore = createConsumerStore({ region }, { tableName: consumerTable })
const spaceDiffStore = createSpaceDiffStore({ region }, { tableName: spaceDiffTable })
const ctx = { spaceDiffStore, consumerStore: withConsumerListCache(consumerStore) }
expect(
await storeSpaceUsageDelta(deltas[0], ctx),
`storing space usage delta for: ${deltas[0].resource}, cause: ${deltas[0].cause}`
Expand Down Expand Up @@ -85,3 +85,38 @@ const parseUcanStreamEvent = event => {
}
})
}

/**
* This means that if a subscription for a space changes, there's a 5 minute
* (max) period where writes may be attributed to the previous subscription.
*
* This happens very infrequently, and DynamoDB is _already_ eventually
* consistent on read so we're just pushing out this delay a little more to
* be able to process data for spaces with frequent writes a lot quicker.
*/
const CONSUMER_LIST_CACHE_TTL = 1000 * 60 * 5
const CONSUMER_LIST_CACHE_MAX = 10_000

/**
* @param {import('../lib/api').ConsumerStore} consumerStore
* @returns {import('../lib/api').ConsumerStore}
*/
const withConsumerListCache = (consumerStore) => {
/** @type {LRUCache<string, Awaited<ReturnType<import('../lib/api').ConsumerStore['list']>>>} */
const cache = new LRUCache({
max: CONSUMER_LIST_CACHE_MAX,
ttl: CONSUMER_LIST_CACHE_TTL
})
return {
...consumerStore,
async list (key, options) {
const cacheKeySuffix = options ? `?cursor=${options.cursor}&size=${options.size}` : ''
const cacheKey = `${key.consumer}${cacheKeySuffix}`
const cached = cache.get(cacheKey)
if (cached) return cached
const res = await consumerStore.list(key, options)
if (res.ok) cache.set(cacheKey, res)
return res
}
}
}
1 change: 1 addition & 0 deletions billing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"@ucanto/server": "^10.0.0",
"@web3-storage/capabilities": "^17.1.1",
"big.js": "^6.2.1",
"lru-cache": "^11.0.0",
"multiformats": "^13.1.0",
"p-retry": "^6.2.0",
"stripe": "^14.2.0",
Expand Down
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions test/filecoin.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,11 @@ test('w3filecoin integration flow', async t => {
await waitForStoreOperationOkResult(
async () => {
// Trigger cron to update and issue receipts based on deals
const callDealerCronRes = await pRetry(() => fetch(`https://staging.dealer.web3.storage/cron`))
t.true(callDealerCronRes.ok)
await pRetry(async () => {
const url = 'https://staging.dealer.web3.storage/cron'
const res = await fetch(url)
if (!res.ok) throw new Error(`failed request to ${url}: ${res.status}`)
}, { onFailedAttempt: console.warn })

return receiptStoreFilecoin.get(aggregateAcceptReceiptCid)
// return agentStoreFilecoin.receipts.get(aggregateAcceptReceiptCid)
Expand Down

0 comments on commit b350e5c

Please sign in to comment.