Skip to content

Commit

Permalink
feat: gateway tracking requested cids in database
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Feb 21, 2022
1 parent df96ce0 commit 2c437ae
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 69 deletions.
1 change: 1 addition & 0 deletions packages/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"itty-router": "^2.4.5",
"multiformats": "^9.6.4",
"nanoid": "^3.1.30",
"nft.storage": "^6.0.0",
"p-any": "^4.0.0",
"p-map": "^5.3.0",
"p-settle": "^5.0.0",
Expand Down
220 changes: 184 additions & 36 deletions packages/gateway/src/durable-objects/summary-metrics.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
import { NFTStorage } from 'nft.storage'
import { PinStatusMap } from '../utils/pins.js'
import {
responseTimeHistogram,
createResponseTimeHistogramObject,
} from '../utils/histogram.js'

/**
* @typedef {'Stored'|'NotStored'} ContentStatus
* @typedef {import('../utils/pins').PinStatus} PinStatus
*
* @typedef {Object} SummaryMetrics
* @property {number} totalWinnerResponseTime total response time of the requests
* @property {number} totalWinnerSuccessfulRequests total number of successful requests
* @property {number} totalCachedResponseTime total response time to forward cached responses
* @property {number} totalCachedResponses total number of cached responses
* @property {number} totalErroredResponsesWithKnownContent total responses errored with content in NFT.storage
* @property {BigInt} totalContentLengthBytes total content length of responses
* @property {BigInt} totalCachedContentLengthBytes total content length of cached responses
* @property {Record<ContentStatus, number>} totalResponsesByContentStatus
* @property {Record<PinStatus, number>} totalResponsesByPinStatus
* @property {Record<string, number>} contentLengthHistogram
* @property {Record<string, number>} responseTimeHistogram
* @property {Record<PinStatus, Record<string, number>>} responseTimeHistogramByPinStatus
*
* @typedef {Object} FetchStats
* @property {number} responseTime number of milliseconds to get response
* @property {number} contentLength content length header content
* @property {string} cid fetched CID
* @property {boolean} errored fetched CID request errored
* @property {number} [responseTime] number of milliseconds to get response
* @property {number} [contentLength] content length header content
*/

// Key to track total time for winner gateway to respond
Expand All @@ -27,6 +38,9 @@ const TOTAL_WINNER_SUCCESSFUL_REQUESTS_ID = 'totalWinnerSuccessfulRequests'
const TOTAL_CACHED_RESPONSE_TIME_ID = 'totalCachedResponseTime'
// Key to track total cached requests
const TOTAL_CACHED_RESPONSES_ID = 'totalCachedResponses'
// Key to track total errored requests with known content
const TOTAL_ERRORED_RESPONSES_WITH_KNOWN_CONTENT_ID =
'totalErroredResponsesWithKnownContent'
// Key to track total content length of responses
const TOTAL_CONTENT_LENGTH_BYTES_ID = 'totalContentLengthBytes'
// Key to track total cached content length of responses
Expand All @@ -35,13 +49,22 @@ const TOTAL_CACHED_CONTENT_LENGTH_BYTES_ID = 'totalCachedContentLengthBytes'
const CONTENT_LENGTH_HISTOGRAM_ID = 'contentLengthHistogram'
// Key to track response time histogram
const RESPONSE_TIME_HISTOGRAM_ID = 'responseTimeHistogram'
// Key to track response time histogram by pin status
const RESPONSE_TIME_HISTOGRAM_BY_PIN_STATUS_ID =
'responseTimeHistogramByPinStatus'
// Key to track responses by content status
const TOTAL_RESPONSES_BY_CONTENT_STATUS_ID = 'totalResponsesByContentStatus'
// Key to track responses by pin status
const TOTAL_RESPONSES_BY_PIN_STATUS_ID = 'totalResponsesByPinStatus'

/**
* Durable Object for keeping summary metrics of gateway.nft.storage
*/
export class SummaryMetrics0 {
constructor(state) {
this.state = state
// @ts-ignore we don't need token just for check
this.nftStorageClient = new NFTStorage({})

// `blockConcurrencyWhile()` ensures no requests are delivered until initialization completes.
this.state.blockConcurrencyWhile(async () => {
Expand All @@ -57,22 +80,40 @@ export class SummaryMetrics0 {
// Total cached requests
this.totalCachedResponses =
(await this.state.storage.get(TOTAL_CACHED_RESPONSES_ID)) || 0
// Total content length responses
// Total errored requests with known content
this.totalErroredResponsesWithKnownContent =
(await this.state.storage.get(
TOTAL_ERRORED_RESPONSES_WITH_KNOWN_CONTENT_ID
)) || 0
/** @type {BigInt} */
this.totalContentLengthBytes =
(await this.state.storage.get(TOTAL_CONTENT_LENGTH_BYTES_ID)) ||
BigInt(0)
// Total cached content length responses
/** @type {BigInt} */
this.totalCachedContentLengthBytes =
(await this.state.storage.get(TOTAL_CACHED_CONTENT_LENGTH_BYTES_ID)) ||
BigInt(0)
// Content length histogram
/** @type {Record<ContentStatus, number>} */
this.totalResponsesByContentStatus =
(await this.state.storage.get(TOTAL_RESPONSES_BY_CONTENT_STATUS_ID)) ||
createResponsesByContentStatusObject()
/** @type {Record<PinStatus, number>} */
this.totalResponsesByPinStatus =
(await this.state.storage.get(TOTAL_RESPONSES_BY_PIN_STATUS_ID)) ||
createResponsesByPinStatusObject()
/** @type {Record<string, number>} */
this.contentLengthHistogram =
(await this.state.storage.get(CONTENT_LENGTH_HISTOGRAM_ID)) ||
createContentLengthHistogramObject()
// Response time histogram
/** @type {Record<string, number>} */
this.responseTimeHistogram =
(await this.state.storage.get(RESPONSE_TIME_HISTOGRAM_ID)) ||
createResponseTimeHistogramObject()
/** @type {Record<PinStatus, Record<string, number>>} */
this.responseTimeHistogramByPinStatus =
(await this.state.storage.get(
RESPONSE_TIME_HISTOGRAM_BY_PIN_STATUS_ID
)) || createResponseTimeHistogramByPinStatusObject()
})
}

Expand All @@ -91,11 +132,17 @@ export class SummaryMetrics0 {
totalWinnerSuccessfulRequests: this.totalWinnerSuccessfulRequests,
totalCachedResponseTime: this.totalCachedResponseTime,
totalCachedResponses: this.totalCachedResponses,
totalErroredResponsesWithKnownContent:
this.totalErroredResponsesWithKnownContent,
totalContentLengthBytes: this.totalContentLengthBytes.toString(),
totalCachedContentLengthBytes:
this.totalCachedContentLengthBytes.toString(),
totalResponsesByContentStatus: this.totalResponsesByContentStatus,
totalResponsesByPinStatus: this.totalResponsesByPinStatus,
contentLengthHistogram: this.contentLengthHistogram,
responseTimeHistogram: this.responseTimeHistogram,
responseTimeHistogramByPinStatus:
this.responseTimeHistogramByPinStatus,
})
)
default:
Expand All @@ -106,15 +153,90 @@ export class SummaryMetrics0 {
// POST
/** @type {FetchStats} */
const data = await request.json()
await Promise.all([
this._updateStatsMetrics(data, url),
this._updateCidMetrics(data),
])

return new Response()
}

/**
* @param {FetchStats} data
*/
async _updateCidMetrics({ cid, errored, responseTime }) {
try {
const statusRes = await this.nftStorageClient.check(cid)

if (errored) {
this.totalErroredResponsesWithKnownContent += 1

await this.state.storage.put(
TOTAL_ERRORED_RESPONSES_WITH_KNOWN_CONTENT_ID,
this.totalErroredResponsesWithKnownContent
)
} else {
this.totalResponsesByContentStatus['Stored'] += 1

const pinStatus = PinStatusMap[statusRes.pin?.status]
if (pinStatus) {
this.totalResponsesByPinStatus[pinStatus] += 1
this.responseTimeHistogramByPinStatus[pinStatus] =
getUpdatedHistogram(
this.responseTimeHistogramByPinStatus[pinStatus],
responseTimeHistogram,
responseTime
)
}

await Promise.all([
this.state.storage.put(
TOTAL_RESPONSES_BY_CONTENT_STATUS_ID,
this.totalResponsesByContentStatus
),
pinStatus &&
this.state.storage.put(
TOTAL_RESPONSES_BY_PIN_STATUS_ID,
this.totalResponsesByPinStatus
),
this.state.storage.put(
RESPONSE_TIME_HISTOGRAM_BY_PIN_STATUS_ID,
this.responseTimeHistogramByPinStatus
),
])
}
} catch (err) {
if (err.message === 'NFT not found') {
// Update not existing CID
this.totalResponsesByContentStatus['NotStored'] += 1

await this.state.storage.put(
TOTAL_RESPONSES_BY_CONTENT_STATUS_ID,
this.totalResponsesByContentStatus
)
}
}
}

/**
* @param {FetchStats} stats
* @param {URL} url
*/
async _updateStatsMetrics(stats, url) {
// Errored does not have winner/cache metrics to update
if (stats.errored) {
return
}

switch (url.pathname) {
case '/metrics/winner':
await this._updateWinnerMetrics(data)
return new Response()
await this._updateWinnerMetrics(stats)
break
case '/metrics/cache':
await this._updatedCacheMetrics(data)
return new Response()
await this._updatedCacheMetrics(stats)
break
default:
return new Response('Not found', { status: 404 })
throw new Error('Not found')
}
}

Expand Down Expand Up @@ -196,49 +318,75 @@ export class SummaryMetrics0 {
*/
_updateContentLengthMetrics(stats) {
this.totalContentLengthBytes += BigInt(stats.contentLength)

// Update histogram
const tmpHistogram = {
...this.contentLengthHistogram,
}

// Get all the histogram buckets where the content size is smaller
const histogramCandidates = contentLengthHistogram.filter(
(h) => stats.contentLength < h
this.contentLengthHistogram = getUpdatedHistogram(
this.contentLengthHistogram,
contentLengthHistogram,
stats.contentLength
)
histogramCandidates.forEach((candidate) => {
tmpHistogram[candidate] += 1
})

this.contentLengthHistogram = tmpHistogram
}

/**
* @param {FetchStats} stats
*/
_updateResponseTimeHistogram(stats) {
const tmpHistogram = {
...this.responseTimeHistogram,
}

// Get all the histogram buckets where the response time is smaller
const histogramCandidates = responseTimeHistogram.filter(
(h) => stats.responseTime < h
this.responseTimeHistogram = getUpdatedHistogram(
this.responseTimeHistogram,
responseTimeHistogram,
stats.responseTime
)
}
}

histogramCandidates.forEach((candidate) => {
tmpHistogram[candidate] += 1
function getUpdatedHistogram(histogramData, histogramBuckets, value) {
const updatedHistogram = {
...histogramData,
}
// Update all the histogram buckets where the response time is smaller
histogramBuckets
.filter((h) => value < h)
.forEach((candidate) => {
updatedHistogram[candidate] += 1
})

this.responseTimeHistogram = tmpHistogram
}
return updatedHistogram
}

/**
* @return {Record<PinStatus, number>}
*/
function createResponsesByPinStatusObject() {
const e = Object.values(PinStatusMap).map((t) => [t, 0])
return Object.fromEntries(e)
}

/**
* @return {Record<ContentStatus, number>}
*/
function createResponsesByContentStatusObject() {
const e = contentStatus.map((t) => [t, 0])
return Object.fromEntries(e)
}

/**
* @return {Record<PinStatus, Record<string, number>>}
*/
function createResponseTimeHistogramByPinStatusObject() {
const pinStatusEntry = Object.values(PinStatusMap).map((t) => [
t,
createResponseTimeHistogramObject(),
])

return Object.fromEntries(pinStatusEntry)
}

function createContentLengthHistogramObject() {
const h = contentLengthHistogram.map((h) => [h, 0])
return Object.fromEntries(h)
}

// Either CID is stored in NFT.storage or not
export const contentStatus = ['Stored', 'NotStored']

// We will count occurences per bucket where content size is less or equal than bucket value
export const contentLengthHistogram = [
0.5, 1, 2, 5, 25, 50, 100, 500, 1000, 5000, 10000, 15000, 20000, 30000, 32000,
Expand Down

0 comments on commit 2c437ae

Please sign in to comment.