From 062dfcbc7af491c996c1db0bf5786a14435753c4 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 10 May 2022 18:05:50 +0200 Subject: [PATCH] fix: integrate edge gateway in api until workers bindings available --- packages/api/package.json | 2 +- packages/api/src/env.js | 7 ++ packages/api/src/perma-cache/post.js | 18 ++- packages/api/test/perma-cache-post.spec.js | 1 - packages/api/wrangler.toml | 9 ++ packages/edge-gateway/src/gateway.js | 131 +++++++++++++++------ pnpm-lock.yaml | 2 +- 7 files changed, 126 insertions(+), 44 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index 9b6b85e..74e1556 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -22,6 +22,7 @@ }, "dependencies": { "@supabase/postgrest-js": "^0.37.2", + "edge-gateway": "^1.5.5", "itty-router": "^2.4.5", "multiformats": "^9.6.4", "nanoid": "^3.1.30", @@ -33,7 +34,6 @@ "ava": "^3.15.0", "browser-env": "^3.3.0", "dotenv": "^16.0.0", - "edge-gateway": "^1.5.5", "esbuild": "^0.14.2", "execa": "^5.1.1", "git-rev-sync": "^3.0.1", diff --git a/packages/api/src/env.js b/packages/api/src/env.js index a2cf3a1..2ff3854 100644 --- a/packages/api/src/env.js +++ b/packages/api/src/env.js @@ -56,6 +56,13 @@ export function envAll(request, env, ctx) { env.log = new Logging(request, env, ctx) env.log.time('request') + + /** + * Add gateway environment + * will be removed once workers bindings are in place. + */ + // @ts-ignore types not complete - Special inputs for Env + env.ipfsGateways = JSON.parse(env.IPFS_GATEWAYS) } /** diff --git a/packages/api/src/perma-cache/post.js b/packages/api/src/perma-cache/post.js index fa7de17..e016687 100644 --- a/packages/api/src/perma-cache/post.js +++ b/packages/api/src/perma-cache/post.js @@ -6,6 +6,8 @@ import { InvalidUrlError, TimeoutError, HTTPError } from '../errors.js' import { JSONResponse } from '../utils/json-response.js' import { normalizeCid } from '../utils/cid.js' +import { gatewayIpfs } from 'edge-gateway/src/gateway.js' + /** * @typedef {import('../env').Env} Env * @typedef {{ userId: string, r2Key: string, date: string }} Key @@ -21,8 +23,9 @@ import { normalizeCid } from '../utils/cid.js' * * @param {Request} request * @param {Env} env + * @param {import('..').Ctx} ctx */ -export async function permaCachePost(request, env) { +export async function permaCachePost(request, env, ctx) { const sourceUrl = getSourceUrl(request, env) const normalizedUrl = getNormalizedUrl(sourceUrl, env) const r2Key = normalizedUrl.toString() @@ -47,7 +50,7 @@ export async function permaCachePost(request, env) { if (!r2Object) { // Fetch Response from provided URL - const response = await getResponse(request, env, normalizedUrl) + const response = await getResponse(request, env, ctx, normalizedUrl) if (!response.ok) { throw new HTTPError( 'Failed to get response from provided URL', @@ -81,9 +84,12 @@ export async function permaCachePost(request, env) { * Fetch Response from provided URL. * @param {Request} request * @param {Env} env + * @param {import('..').Ctx} ctx * @param {URL} url */ -async function getResponse(request, env, url) { +async function getResponse(request, env, ctx, url) { + // TODO: Wait for CF services support + /* const controller = new AbortController() const timer = setTimeout(() => controller.abort(), env.REQUEST_TIMEOUT) let response @@ -101,6 +107,12 @@ async function getResponse(request, env, url) { clearTimeout(timer) } return response + */ + + request = new Request(url.toString()) + + // @ts-ignore Env does not match entirely + return await gatewayIpfs(request, env, ctx) } /** diff --git a/packages/api/test/perma-cache-post.spec.js b/packages/api/test/perma-cache-post.spec.js index 4946b78..088871b 100644 --- a/packages/api/test/perma-cache-post.spec.js +++ b/packages/api/test/perma-cache-post.spec.js @@ -194,7 +194,6 @@ const validateSuccessfulPut = async (t, url, body, responseTxt) => { const { normalizedUrl, sourceUrl } = getParsedUrl(url) t.is(body.url, sourceUrl) t.truthy(body.insertedAt) - t.falsy(body.deletedAt) t.truthy(body.size) // Validate DB diff --git a/packages/api/wrangler.toml b/packages/api/wrangler.toml index a6e3700..2c8316d 100644 --- a/packages/api/wrangler.toml +++ b/packages/api/wrangler.toml @@ -22,12 +22,16 @@ main = "worker.mjs" account_id = "fffa4b4363a7e5250af8357087263b3a" # Protocol Labs CF account zone_id = "c7795a0adce7609a95d62fec04705aff" # nftstorage.link zone route = "api.nftstorage.link/*" +kv_namespaces = [ + { binding = "DENYLIST", id = "785cf627e913468ca5319523ae929def" } +] [env.production.vars] DATABASE_URL = "https://nft-link-prod.herokuapp.com" DEBUG = "false" ENV = "production" GATEWAY_DOMAIN = "nftstorage.link" +IPFS_GATEWAYS = "[\"https://ipfs.io\", \"https://cf.nftstorage.link\", \"https://nft-storage.mypinata.cloud/\"]" [[env.production.r2_buckets]] bucket_name = "super-hot" @@ -39,12 +43,16 @@ binding = "SUPERHOT" account_id = "fffa4b4363a7e5250af8357087263b3a" # Protocol Labs CF account zone_id = "c7795a0adce7609a95d62fec04705aff" # nftstorage.link zone route = "api-staging.nftstorage.link/*" +kv_namespaces = [ + { binding = "DENYLIST", id = "f4eb0eca32e14e28b643604a82e00cb3" } +] [env.staging.vars] DATABASE_URL = "https://nft-link-staging.herokuapp.com" DEBUG = "true" ENV = "staging" GATEWAY_DOMAIN = "nftstorage.link" +IPFS_GATEWAYS = "[\"https://ipfs.io\", \"https://cf.nftstorage.link\", \"https://nft-storage.mypinata.cloud/\"]" [[env.staging.r2_buckets]] bucket_name = "super-hot-staging" @@ -58,6 +66,7 @@ workers_dev = true DEBUG = "true" ENV = "test" GATEWAY_DOMAIN = "localhost:9081" +IPFS_GATEWAYS = "[\"http://127.0.0.1:9081\"]" # Dev! [env.vsantos] diff --git a/packages/edge-gateway/src/gateway.js b/packages/edge-gateway/src/gateway.js index b41e154..7cde15b 100644 --- a/packages/edge-gateway/src/gateway.js +++ b/packages/edge-gateway/src/gateway.js @@ -12,7 +12,6 @@ import { toDenyListAnchor } from './utils/deny-list.js' import { CIDS_TRACKER_ID, SUMMARY_METRICS_ID, - GATEWAY_RATE_LIMIT_ID, REDIRECT_COUNTER_METRICS_ID, CF_CACHE_MAX_OBJECT_SIZE, HTTP_STATUS_RATE_LIMITED, @@ -29,6 +28,12 @@ import { * @property {boolean} [aborted] * * @typedef {import('./env').Env} Env + * @typedef {import('p-settle').PromiseResult} PromiseResultGatewayResponse + * + * @typedef {Object} IPFSResolutionOptions + * @property {(response: Response, responseTime: number) => void} [onCdnResolution] + * @property {(winnerGwResponse: GatewayResponse, gatewayReqs: Promise[], cid: string) => void} [onRaceResolution] + * @property {(gwResponses: PromiseResultGatewayResponse[], wasRateLimited: boolean) => void} [onRaceError] */ /** @@ -47,11 +52,57 @@ export async function gatewayGet(request, env, ctx) { ) } + return await gatewayIpfs(request, env, ctx, { + onCdnResolution: (res, responseTime) => { + ctx.waitUntil(updateSummaryCacheMetrics(request, env, res, responseTime)) + }, + onRaceResolution: (winnerGwResponse, gatewayReqs, cid) => { + ctx.waitUntil( + (async () => { + await Promise.all([ + storeWinnerGwResponse(request, env, winnerGwResponse), + settleGatewayRequests( + request, + env, + gatewayReqs, + winnerGwResponse.url, + cid + ), + ]) + })() + ) + }, + onRaceError: (gwResponses, wasRateLimited) => { + ctx.waitUntil( + (async () => { + // Update metrics as all requests failed + await Promise.all( + gwResponses.map((r) => + updateGatewayMetrics(request, env, r.value, false) + ) + ) + wasRateLimited && updateGatewayRedirectCounter(request, env) + })() + ) + }, + }) +} + +/** + * Perform edge gateway IPFS content resolution. + * + * @param {Request} request + * @param {Env} env + * @param {import('./index').Ctx} ctx + * @param {IPFSResolutionOptions} [options] + */ +export async function gatewayIpfs(request, env, ctx, options = {}) { const startTs = Date.now() const reqUrl = new URL(request.url) const cid = getCidFromSubdomainUrl(reqUrl) const pathname = reqUrl.pathname + // Validation layer if (env.DENYLIST) { const anchor = await toDenyListAnchor(cid) // TODO: Remove once https://github.com/nftstorage/nftstorage.link/issues/51 is fixed @@ -69,6 +120,7 @@ export async function gatewayGet(request, env, ctx) { } } + // 1st layer resolution - CDN const cache = caches.default const res = await cache.match(request.url) @@ -76,52 +128,36 @@ export async function gatewayGet(request, env, ctx) { // Update cache metrics in background const responseTime = Date.now() - startTs - ctx.waitUntil(updateSummaryCacheMetrics(request, env, res, responseTime)) + options.onCdnResolution && options.onCdnResolution(res, responseTime) return res } - // Prepare IPFS gateway requests + // 2nd layer resolution - Public Gateways race const gatewayReqs = env.ipfsGateways.map((gwUrl) => gatewayFetch(gwUrl, cid, request, { pathname, timeout: env.REQUEST_TIMEOUT, }) ) + try { /** @type {GatewayResponse} */ const winnerGwResponse = await pAny(gatewayReqs, { filter: (res) => res.response?.ok, }) - - async function settleGatewayRequests() { - // Wait for remaining responses - const responses = await pSettle(gatewayReqs) - const successFullResponses = responses.filter( - (r) => r.value?.response?.ok - ) - - await Promise.all([ - // Filter out winner and update remaining gateway metrics - ...responses - .filter((r) => r.value?.url !== winnerGwResponse.url) - .map((r) => updateGatewayMetrics(request, env, r.value, false)), - updateCidsTracker(request, env, successFullResponses, cid), - ]) - } - + options.onRaceResolution && + options.onRaceResolution(winnerGwResponse, gatewayReqs, cid) + // Cache response ctx.waitUntil( (async () => { const contentLengthMb = Number( winnerGwResponse.response.headers.get('content-length') ) - await Promise.all([ - storeWinnerGwResponse(request, env, winnerGwResponse), - settleGatewayRequests(), - // Cache request URL in Cloudflare CDN if smaller than CF_CACHE_MAX_OBJECT_SIZE - contentLengthMb <= CF_CACHE_MAX_OBJECT_SIZE && - cache.put(request.url, winnerGwResponse.response.clone()), - ]) + // Cache request URL in Cloudflare CDN if smaller than CF_CACHE_MAX_OBJECT_SIZE + if (contentLengthMb <= CF_CACHE_MAX_OBJECT_SIZE) { + await cache.put(request.url, winnerGwResponse.response.clone()) + } })() ) @@ -137,17 +173,7 @@ export async function gatewayGet(request, env, ctx) { r.value?.reason === REQUEST_PREVENTED_RATE_LIMIT_CODE ) - ctx.waitUntil( - (async () => { - // Update metrics as all requests failed - await Promise.all( - responses.map((r) => - updateGatewayMetrics(request, env, r.value, false) - ) - ) - wasRateLimited && updateGatewayRedirectCounter(request, env) - })() - ) + options.onRaceError && options.onRaceError(responses, wasRateLimited) if (wasRateLimited) { const ipfsUrl = new URL('ipfs', env.ipfsGateways[0]) @@ -176,6 +202,35 @@ export async function gatewayGet(request, env, ctx) { } } +/** + * Settle all gateway requests and update metrics. + * + * @param {Request} request + * @param {Env} env + * @param {Promise[]} gatewayReqs + * @param {string} winnerUrl + * @param {string} cid + */ +async function settleGatewayRequests( + request, + env, + gatewayReqs, + winnerUrl, + cid +) { + // Wait for remaining responses + const responses = await pSettle(gatewayReqs) + const successFullResponses = responses.filter((r) => r.value?.response?.ok) + + await Promise.all([ + // Filter out winner and update remaining gateway metrics + ...responses + .filter((r) => r.value?.url !== winnerUrl) + .map((r) => updateGatewayMetrics(request, env, r.value, false)), + updateCidsTracker(request, env, successFullResponses, cid), + ]) +} + /** * Store metrics for winner gateway response * diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 7342eff..60df5ee 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -47,6 +47,7 @@ importers: uint8arrays: ^3.0.0 dependencies: '@supabase/postgrest-js': 0.37.2 + edge-gateway: link:../edge-gateway itty-router: 2.6.1 multiformats: 9.6.4 nanoid: 3.3.3 @@ -57,7 +58,6 @@ importers: ava: 3.15.0 browser-env: 3.3.0 dotenv: 16.0.0 - edge-gateway: link:../edge-gateway esbuild: 0.14.38 execa: 5.1.1 git-rev-sync: 3.0.2