Skip to content

Commit

Permalink
fix: integrate edge gateway in api until workers bindings available
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed May 11, 2022
1 parent f3e107c commit 0137a42
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 47 deletions.
2 changes: 1 addition & 1 deletion packages/api/package.json
Expand Up @@ -22,6 +22,7 @@
},
"dependencies": {
"@supabase/postgrest-js": "^0.37.2",
"edge-gateway": "^1.5.5",
"itty-router": "^2.4.5",
"multiformats": "^9.6.4",
"nanoid": "^3.1.30",
Expand All @@ -33,7 +34,6 @@
"ava": "^3.15.0",
"browser-env": "^3.3.0",
"dotenv": "^16.0.0",
"edge-gateway": "^1.5.5",
"esbuild": "^0.14.2",
"execa": "^5.1.1",
"git-rev-sync": "^3.0.1",
Expand Down
7 changes: 7 additions & 0 deletions packages/api/src/env.js
Expand Up @@ -56,6 +56,13 @@ export function envAll(request, env, ctx) {

env.log = new Logging(request, env, ctx)
env.log.time('request')

/**
* Add gateway environment
* will be removed once workers bindings are in place.
*/
// @ts-ignore types not complete - Special inputs for Env
env.ipfsGateways = JSON.parse(env.IPFS_GATEWAYS)
}

/**
Expand Down
18 changes: 15 additions & 3 deletions packages/api/src/perma-cache/post.js
Expand Up @@ -7,6 +7,8 @@ import { JSONResponse } from '../utils/json-response.js'
import { normalizeCid } from '../utils/cid.js'
import { encodeKey } from './utils.js'

import { gatewayIpfs } from 'edge-gateway/src/gateway.js'

/**
* @typedef {import('../env').Env} Env
* @typedef {{ userId: string, r2Key: string, date: string }} Key
Expand All @@ -22,8 +24,9 @@ import { encodeKey } from './utils.js'
*
* @param {Request} request
* @param {Env} env
* @param {import('..').Ctx} ctx
*/
export async function permaCachePost(request, env) {
export async function permaCachePost(request, env, ctx) {
const sourceUrl = getSourceUrl(request, env)
const normalizedUrl = getNormalizedUrl(sourceUrl, env)
const r2Key = normalizedUrl.toString()
Expand All @@ -49,7 +52,7 @@ export async function permaCachePost(request, env) {

if (!r2Object) {
// Fetch Response from provided URL
const response = await getResponse(request, env, normalizedUrl)
const response = await getResponse(request, env, ctx, normalizedUrl)
if (!response.ok) {
throw new HTTPError(
'Failed to get response from provided URL',
Expand Down Expand Up @@ -100,9 +103,12 @@ export async function permaCachePost(request, env) {
* Fetch Response from provided URL.
* @param {Request} request
* @param {Env} env
* @param {import('..').Ctx} ctx
* @param {URL} url
*/
async function getResponse(request, env, url) {
async function getResponse(request, env, ctx, url) {
// TODO: Wait for CF services support
/*
const controller = new AbortController()
const timer = setTimeout(() => controller.abort(), env.REQUEST_TIMEOUT)
let response
Expand All @@ -120,6 +126,12 @@ async function getResponse(request, env, url) {
clearTimeout(timer)
}
return response
*/

request = new Request(url.toString())

// @ts-ignore Env does not match entirely
return await gatewayIpfs(request, env, ctx)
}

/**
Expand Down
1 change: 0 additions & 1 deletion packages/api/test/perma-cache-post.spec.js
Expand Up @@ -174,7 +174,6 @@ const validateSuccessfulPut = async (t, url, body, responseTxt) => {
const { normalizedUrl, sourceUrl } = getParsedUrl(url)
t.is(body.url, sourceUrl)
t.truthy(body.date)
t.falsy(body.deletedAt)
t.truthy(body.size)

// Validate KV
Expand Down
11 changes: 8 additions & 3 deletions packages/api/wrangler.toml
Expand Up @@ -24,14 +24,16 @@ zone_id = "c7795a0adce7609a95d62fec04705aff" # nftstorage.link zone
route = "api.nftstorage.link/*"
kv_namespaces = [
{ binding = "PERMACACHE", id = "73fa71d7cadc4671b91a236cee82d502", preview_id = "73fa71d7cadc4671b91a236cee82d502" },
{ binding = "PERMACACHE_HISTORY", id = "ed2c0aeb57eb4f72a7edf8e097efb3fa", preview_id = "ed2c0aeb57eb4f72a7edf8e097efb3fa" }
{ binding = "PERMACACHE_HISTORY", id = "ed2c0aeb57eb4f72a7edf8e097efb3fa", preview_id = "ed2c0aeb57eb4f72a7edf8e097efb3fa" },
{ binding = "DENYLIST", id = "785cf627e913468ca5319523ae929def" }
]

[env.production.vars]
DATABASE_URL = "https://nft-storage-pgrest-prod.herokuapp.com"
DEBUG = "false"
ENV = "production"
GATEWAY_DOMAIN = "nftstorage.link"
IPFS_GATEWAYS = "[\"https://ipfs.io\", \"https://cf.nftstorage.link\", \"https://nft-storage.mypinata.cloud/\"]"

#[env.production.r2_buckets]
#binding = "SUPERHOT"
Expand All @@ -45,14 +47,16 @@ zone_id = "c7795a0adce7609a95d62fec04705aff" # nftstorage.link zone
route = "api-staging.nftstorage.link/*"
kv_namespaces = [
{ binding = "PERMACACHE", id = "de8e81e8107d4b2497dfa9e288f5dc8c", preview_id = "de8e81e8107d4b2497dfa9e288f5dc8c" },
{ binding = "PERMACACHE_HISTORY", id = "bac8069051ee4796a305b4d3f366b930", preview_id = "bac8069051ee4796a305b4d3f366b930" }
{ binding = "PERMACACHE_HISTORY", id = "bac8069051ee4796a305b4d3f366b930", preview_id = "bac8069051ee4796a305b4d3f366b930" },
{ binding = "DENYLIST", id = "f4eb0eca32e14e28b643604a82e00cb3" }
]

[env.staging.vars]
DATABASE_URL = "https://nft-storage-pgrest-staging.herokuapp.com"
DEBUG = "true"
ENV = "staging"
GATEWAY_DOMAIN = "nftstorage.link"
IPFS_GATEWAYS = "[\"https://ipfs.io\", \"https://cf.nftstorage.link\", \"https://nft-storage.mypinata.cloud/\"]"

[[env.staging.r2_buckets]]
bucket_name = "super-hot"
Expand All @@ -61,12 +65,13 @@ binding = "SUPERHOT"
# Test!
[env.test]
workers_dev = true
kv_namespaces = [{ binding = "PERMACACHE" }, { binding = "PERMACACHE_HISTORY" }]
kv_namespaces = [{ binding = "PERMACACHE" }, { binding = "PERMACACHE_HISTORY" }, { binding = "DENYLIST" }]

[env.test.vars]
DEBUG = "true"
ENV = "test"
GATEWAY_DOMAIN = "localhost:9081"
IPFS_GATEWAYS = "[\"http://127.0.0.1:9081\"]"

# Dev!
[env.vsantos]
Expand Down
131 changes: 93 additions & 38 deletions packages/edge-gateway/src/gateway.js
Expand Up @@ -12,7 +12,6 @@ import { toDenyListAnchor } from './utils/deny-list.js'
import {
CIDS_TRACKER_ID,
SUMMARY_METRICS_ID,
GATEWAY_RATE_LIMIT_ID,
REDIRECT_COUNTER_METRICS_ID,
CF_CACHE_MAX_OBJECT_SIZE,
HTTP_STATUS_RATE_LIMITED,
Expand All @@ -29,6 +28,12 @@ import {
* @property {boolean} [aborted]
*
* @typedef {import('./env').Env} Env
* @typedef {import('p-settle').PromiseResult<GatewayResponse>} PromiseResultGatewayResponse
*
* @typedef {Object} IPFSResolutionOptions
* @property {(response: Response, responseTime: number) => void} [onCdnResolution]
* @property {(winnerGwResponse: GatewayResponse, gatewayReqs: Promise<GatewayResponse>[], cid: string) => void} [onRaceResolution]
* @property {(gwResponses: PromiseResultGatewayResponse[], wasRateLimited: boolean) => void} [onRaceError]
*/

/**
Expand All @@ -47,11 +52,57 @@ export async function gatewayGet(request, env, ctx) {
)
}

return await gatewayIpfs(request, env, ctx, {
onCdnResolution: (res, responseTime) => {
ctx.waitUntil(updateSummaryCacheMetrics(request, env, res, responseTime))
},
onRaceResolution: (winnerGwResponse, gatewayReqs, cid) => {
ctx.waitUntil(
(async () => {
await Promise.all([
storeWinnerGwResponse(request, env, winnerGwResponse),
settleGatewayRequests(
request,
env,
gatewayReqs,
winnerGwResponse.url,
cid
),
])
})()
)
},
onRaceError: (gwResponses, wasRateLimited) => {
ctx.waitUntil(
(async () => {
// Update metrics as all requests failed
await Promise.all(
gwResponses.map((r) =>
updateGatewayMetrics(request, env, r.value, false)
)
)
wasRateLimited && updateGatewayRedirectCounter(request, env)
})()
)
},
})
}

/**
* Perform edge gateway IPFS content resolution.
*
* @param {Request} request
* @param {Env} env
* @param {import('./index').Ctx} ctx
* @param {IPFSResolutionOptions} [options]
*/
export async function gatewayIpfs(request, env, ctx, options = {}) {
const startTs = Date.now()
const reqUrl = new URL(request.url)
const cid = getCidFromSubdomainUrl(reqUrl)
const pathname = reqUrl.pathname

// Validation layer
if (env.DENYLIST) {
const anchor = await toDenyListAnchor(cid)
// TODO: Remove once https://github.com/nftstorage/nftstorage.link/issues/51 is fixed
Expand All @@ -69,59 +120,44 @@ export async function gatewayGet(request, env, ctx) {
}
}

// 1st layer resolution - CDN
const cache = caches.default
const res = await cache.match(request.url)

if (res) {
// Update cache metrics in background
const responseTime = Date.now() - startTs

ctx.waitUntil(updateSummaryCacheMetrics(request, env, res, responseTime))
options.onCdnResolution && options.onCdnResolution(res, responseTime)
return res
}

// Prepare IPFS gateway requests
// 2nd layer resolution - Public Gateways race
const gatewayReqs = env.ipfsGateways.map((gwUrl) =>
gatewayFetch(gwUrl, cid, request, {
pathname,
timeout: env.REQUEST_TIMEOUT,
})
)

try {
/** @type {GatewayResponse} */
const winnerGwResponse = await pAny(gatewayReqs, {
filter: (res) => res.response?.ok,
})

async function settleGatewayRequests() {
// Wait for remaining responses
const responses = await pSettle(gatewayReqs)
const successFullResponses = responses.filter(
(r) => r.value?.response?.ok
)

await Promise.all([
// Filter out winner and update remaining gateway metrics
...responses
.filter((r) => r.value?.url !== winnerGwResponse.url)
.map((r) => updateGatewayMetrics(request, env, r.value, false)),
updateCidsTracker(request, env, successFullResponses, cid),
])
}

options.onRaceResolution &&
options.onRaceResolution(winnerGwResponse, gatewayReqs, cid)
// Cache response
ctx.waitUntil(
(async () => {
const contentLengthMb = Number(
winnerGwResponse.response.headers.get('content-length')
)

await Promise.all([
storeWinnerGwResponse(request, env, winnerGwResponse),
settleGatewayRequests(),
// Cache request URL in Cloudflare CDN if smaller than CF_CACHE_MAX_OBJECT_SIZE
contentLengthMb <= CF_CACHE_MAX_OBJECT_SIZE &&
cache.put(request.url, winnerGwResponse.response.clone()),
])
// Cache request URL in Cloudflare CDN if smaller than CF_CACHE_MAX_OBJECT_SIZE
if (contentLengthMb <= CF_CACHE_MAX_OBJECT_SIZE) {
await cache.put(request.url, winnerGwResponse.response.clone())
}
})()
)

Expand All @@ -137,17 +173,7 @@ export async function gatewayGet(request, env, ctx) {
r.value?.reason === REQUEST_PREVENTED_RATE_LIMIT_CODE
)

ctx.waitUntil(
(async () => {
// Update metrics as all requests failed
await Promise.all(
responses.map((r) =>
updateGatewayMetrics(request, env, r.value, false)
)
)
wasRateLimited && updateGatewayRedirectCounter(request, env)
})()
)
options.onRaceError && options.onRaceError(responses, wasRateLimited)

if (wasRateLimited) {
const ipfsUrl = new URL('ipfs', env.ipfsGateways[0])
Expand Down Expand Up @@ -176,6 +202,35 @@ export async function gatewayGet(request, env, ctx) {
}
}

/**
* Settle all gateway requests and update metrics.
*
* @param {Request} request
* @param {Env} env
* @param {Promise<GatewayResponse>[]} gatewayReqs
* @param {string} winnerUrl
* @param {string} cid
*/
async function settleGatewayRequests(
request,
env,
gatewayReqs,
winnerUrl,
cid
) {
// Wait for remaining responses
const responses = await pSettle(gatewayReqs)
const successFullResponses = responses.filter((r) => r.value?.response?.ok)

await Promise.all([
// Filter out winner and update remaining gateway metrics
...responses
.filter((r) => r.value?.url !== winnerUrl)
.map((r) => updateGatewayMetrics(request, env, r.value, false)),
updateCidsTracker(request, env, successFullResponses, cid),
])
}

/**
* Store metrics for winner gateway response
*
Expand Down
2 changes: 1 addition & 1 deletion pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0137a42

Please sign in to comment.