diff --git a/billing/functions/ucan-stream.js b/billing/functions/ucan-stream.js index 2ac90960..5704de09 100644 --- a/billing/functions/ucan-stream.js +++ b/billing/functions/ucan-stream.js @@ -1,6 +1,7 @@ import * as Sentry from '@sentry/serverless' import { toString, fromString } from 'uint8arrays' import * as Link from 'multiformats/link' +import { LRUCache } from 'lru-cache' import { createSpaceDiffStore } from '../tables/space-diff.js' import { createConsumerStore } from '../tables/consumer.js' import { expect } from './lib.js' @@ -45,10 +46,9 @@ export const handler = Sentry.AWSLambda.wrapHandler( } console.log("Storing space usage delta", deltas[0]) - const ctx = { - spaceDiffStore: createSpaceDiffStore({ region }, { tableName: spaceDiffTable }), - consumerStore: createConsumerStore({ region }, { tableName: consumerTable }) - } + const consumerStore = createConsumerStore({ region }, { tableName: consumerTable }) + const spaceDiffStore = createSpaceDiffStore({ region }, { tableName: spaceDiffTable }) + const ctx = { spaceDiffStore, consumerStore: withConsumerListCache(consumerStore) } expect( await storeSpaceUsageDelta(deltas[0], ctx), `storing space usage delta for: ${deltas[0].resource}, cause: ${deltas[0].cause}` @@ -85,3 +85,38 @@ const parseUcanStreamEvent = event => { } }) } + +/** + * This means that if a subscription for a space changes, there's a 5 minute + * (max) period where writes may be attributed to the previous subscription. + * + * This happens very infrequently, and DynamoDB is _already_ eventually + * consistent on read so we're just pushing out this delay a little more to + * be able to process data for spaces with frequent writes a lot quicker. + */ +const CONSUMER_LIST_CACHE_TTL = 1000 * 60 * 5 +const CONSUMER_LIST_CACHE_MAX = 10_000 + +/** + * @param {import('../lib/api').ConsumerStore} consumerStore + * @returns {import('../lib/api').ConsumerStore} + */ +const withConsumerListCache = (consumerStore) => { + /** @type {LRUCache>>} */ + const cache = new LRUCache({ + max: CONSUMER_LIST_CACHE_MAX, + ttl: CONSUMER_LIST_CACHE_TTL + }) + return { + ...consumerStore, + async list (key, options) { + const cacheKeySuffix = options ? `?cursor=${options.cursor}&size=${options.size}` : '' + const cacheKey = `${key.consumer}${cacheKeySuffix}` + const cached = cache.get(cacheKey) + if (cached) return cached + const res = await consumerStore.list(key, options) + if (res.ok) cache.set(cacheKey, res) + return res + } + } +} diff --git a/billing/package.json b/billing/package.json index d736ad64..b0a96027 100644 --- a/billing/package.json +++ b/billing/package.json @@ -15,6 +15,7 @@ "@ucanto/server": "^10.0.0", "@web3-storage/capabilities": "^17.1.1", "big.js": "^6.2.1", + "lru-cache": "^11.0.0", "multiformats": "^13.1.0", "p-retry": "^6.2.0", "stripe": "^14.2.0", diff --git a/package-lock.json b/package-lock.json index 51316929..1e5a3289 100644 --- a/package-lock.json +++ b/package-lock.json @@ -80,6 +80,7 @@ "@ucanto/server": "^10.0.0", "@web3-storage/capabilities": "^17.1.1", "big.js": "^6.2.1", + "lru-cache": "^11.0.0", "multiformats": "^13.1.0", "p-retry": "^6.2.0", "stripe": "^14.2.0", @@ -99,6 +100,14 @@ "node": ">=16.15" } }, + "billing/node_modules/lru-cache": { + "version": "11.0.0", + "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-11.0.0.tgz", + "integrity": "sha512-Qv32eSV1RSCfhY3fpPE2GNZ8jgM9X7rdAfemLWqTUxwiyIC4jJ6Sy0fZ8H+oLWevO6i4/bizg7c8d8i6bxrzbA==", + "engines": { + "node": "20 || >=22" + } + }, "carpark": { "name": "@web3-storage/w3infra-carpark", "version": "0.0.0", diff --git a/test/filecoin.test.js b/test/filecoin.test.js index 9f829c72..4f37bfbb 100644 --- a/test/filecoin.test.js +++ b/test/filecoin.test.js @@ -234,8 +234,11 @@ test('w3filecoin integration flow', async t => { await waitForStoreOperationOkResult( async () => { // Trigger cron to update and issue receipts based on deals - const callDealerCronRes = await pRetry(() => fetch(`https://staging.dealer.web3.storage/cron`)) - t.true(callDealerCronRes.ok) + await pRetry(async () => { + const url = 'https://staging.dealer.web3.storage/cron' + const res = await fetch(url) + if (!res.ok) throw new Error(`failed request to ${url}: ${res.status}`) + }, { onFailedAttempt: console.warn }) return receiptStoreFilecoin.get(aggregateAcceptReceiptCid) // return agentStoreFilecoin.receipts.get(aggregateAcceptReceiptCid)