Skip to content

Commit

Permalink
feat: add metrics and filter subrequest errors (#11)
Browse files Browse the repository at this point in the history
- adds Metrics class for count of blocks, bytes and indexes
- re-work blockstore error handling and dont throw on subrequest errors
as it's expected

e.g.

```sh
$ wrangler tail --env staging
 ⛅️ wrangler 3.3.0
------------------
Successfully created tail, expires at 2023-07-20T22:14:26Z
Connected to hoverboard-staging, waiting for logs...
GET https://hoverboard-staging.dag.haus/p2p/REDACTED - Ok @ 20/07/2023, 17:15:49
  (log) {"msg":"peer:connect","peer":"12D3KooWHZA6kRakMtwtXTQFJ9f7HKBr9ggpNxuDxLkbVH2dpB8x"}
  (log) {"msg":"peer:disconnect","peer":"12D3KooWHZA6kRakMtwtXTQFJ9f7HKBr9ggpNxuDxLkbVH2dpB8x","blocks":11,"blocksR2":11,"blocksS3":0,"blocksCached":0,"blockBytes":8594021,"blockBytesR2":8594021,"blockBytesS3":0,"blockBytesCached":0,"indexes":22,"indexesCached":0}
```

License: MIT

Signed-off-by: Oli Evans <oli@protocol.ai>
  • Loading branch information
olizilla committed Jul 21, 2023
1 parent eeab589 commit 29b2fee
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 45 deletions.
36 changes: 27 additions & 9 deletions src/blocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ const CAR = 0x202
/**
* @param {Env} env
* @param {ExecutionContext} ctx
* @param {import('./metrics.js').Metrics} metrics
*/
export async function getBlockstore (env, ctx) {
const dynamo = new DynamoIndex(getDynamoClient(env), env.DYNAMO_TABLE, { maxEntries: 3, preferRegion: 'us-west-2' })
const index = new CachingIndex(dynamo, await caches.open(`dynamo:${env.DYNAMO_TABLE}`), ctx)
export async function getBlockstore (env, ctx, metrics) {
const dynamo = new DynamoIndex(getDynamoClient(env), env.DYNAMO_TABLE, metrics, { maxEntries: 3, preferRegion: 'us-west-2' })
const index = new CachingIndex(dynamo, await caches.open(`dynamo:${env.DYNAMO_TABLE}`), ctx, metrics)
const s3 = new DynamoBlockstore(index, getS3Clients(env))
const r2 = new DagHausBlockStore(index, env.CARPARK, s3)
const cached = new CachingBlockStore(r2, await caches.open('blockstore:bytes'), ctx)
const r2 = new DagHausBlockStore(index, env.CARPARK, s3, metrics)
const cached = new CachingBlockStore(r2, await caches.open('blockstore:bytes'), ctx, metrics)
return env.DENYLIST ? new DenyingBlockStore(env.DENYLIST, cached) : cached
}

Expand All @@ -46,11 +47,13 @@ export class CachingBlockStore {
* @param {Blockstore} blockstore
* @param {Cache} cache
* @param {ExecutionContext} ctx
* @param {import('./metrics.js').Metrics} metrics
*/
constructor (blockstore, cache, ctx) {
constructor (blockstore, cache, ctx, metrics) {
this.blockstore = blockstore
this.cache = cache
this.ctx = ctx
this.metrics = metrics
}

/**
Expand All @@ -71,7 +74,11 @@ export class CachingBlockStore {
const cached = await this.cache.match(key)
if (cached) {
const buff = await cached.arrayBuffer()
return new Uint8Array(buff)
const bytes = new Uint8Array(buff)
this.metrics.blocks++
this.metrics.blocksCached++
this.metrics.blockBytes += bytes.byteLength
this.metrics.blockBytesCached += bytes.byteLength
}
const res = await this.blockstore.get(cid)
if (res) {
Expand Down Expand Up @@ -101,11 +108,13 @@ export class DagHausBlockStore {
* @param {import('./s3/block-index.js').BlockIndex} dynamo
* @param {R2Bucket} carpark
* @param {import('./s3/blockstore.js').CarBlockstore} s3
* @param {import('./metrics.js').Metrics} metrics
*/
constructor (dynamo, carpark, s3) {
constructor (dynamo, carpark, s3, metrics) {
this.dynamo = dynamo
this.carpark = carpark
this.s3 = s3
this.metrics = metrics
}

/**
Expand All @@ -131,14 +140,23 @@ export class DagHausBlockStore {
const obj = await this.carpark.get(carKey, { range: { offset, length } })
if (obj) {
const buff = await obj.arrayBuffer()
return new Uint8Array(buff)
const bytes = new Uint8Array(buff)
this.metrics.blocks++
this.metrics.blockBytes += bytes.byteLength
this.metrics.blocksR2++
this.metrics.blockBytesR2 += bytes.byteLength
return bytes
}
}
}

// fallback to s3
const block = await this.s3.get(cid, idxEntries)
if (!block) return undefined
this.metrics.blocks++
this.metrics.blockBytes += block.bytes.byteLength
this.metrics.blocksS3++
this.metrics.blockBytesS3 += block.bytes.byteLength
return block.bytes
}
}
Expand Down
49 changes: 20 additions & 29 deletions src/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ export async function getPeerId (env) {
/**
* Setup our libp2p service
* @param {Env} env
* @param {import('./deny.js').Blockstore} blockstore
* @param {import('@libp2p/interface-peer-id').PeerId} peerId
* @param {import('cf-libp2p-ws-transport').WebSockets} transport
*/
export async function getLibp2p (env, blockstore, peerId, transport) {
export async function getLibp2p (env, transport) {
const peerId = await getPeerId(env)
const libp2p = await createLibp2p({
peerId,
addresses: { listen: [getListenAddr(env)] },
Expand All @@ -35,51 +34,43 @@ export async function getLibp2p (env, blockstore, peerId, transport) {
identify: identifyService()
}
})
return libp2p
}

// rough metrics as a staring point
let blocks = 0
let bytes = 0
let miss = 0

/**
* @param {import('libp2p').Libp2p} libp2p
* @param {import('./blocks.js').Blockstore} blockstore
* @param {(err: Error) => Promise<void>} onError
*/
export function enableBitswap (libp2p, blockstore, onError = async () => {}) {
const miniswap = new Miniswap({
async has (cid) {
try {
const res = await blockstore.has(cid)
return res
} catch (err) {
libp2p.stop()
throw err
await onError(asError(err))
return false
}
},
async get (cid) {
try {
const res = await blockstore.get(cid)
if (res) {
bytes += res.byteLength
blocks++
} else {
miss++
}
return res
} catch (err) {
libp2p.stop()
throw err
await onError(asError(err))
}
}
})

libp2p.addEventListener('peer:connect', (evt) => {
const remotePeer = evt.detail
console.log(JSON.stringify({ msg: 'peer:connect', peer: remotePeer.toString() }))
})
libp2p.addEventListener('peer:disconnect', (evt) => {
const remotePeer = evt.detail
console.log(JSON.stringify({ msg: 'peer:disconnect', peer: remotePeer.toString(), blocks, bytes, miss }))
})

libp2p.handle(BITSWAP_PROTOCOL, miniswap.handler)
}

return libp2p
/** @param {unknown} err */
function asError (err) {
if (err instanceof Error) {
return err
}
return new Error(`${err}`)
}

/**
Expand Down
22 changes: 22 additions & 0 deletions src/metrics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export class Metrics {
/** all blocks read (r2 + s3 + cache) */
blocks = 0
/** count of blocks read from R2 */
blocksR2 = 0
/** count of blocks read from S3 */
blocksS3 = 0
/** count of blocks read from Cache */
blocksCached = 0
/** total block bytes read (r2 + s3 + cache) */
blockBytes = 0
/** block bytes read from R2 */
blockBytesR2 = 0
/** block bytes read from S3 */
blockBytesS3 = 0
/** block bytes read from Cloudflare Cache */
blockBytesCached = 0
/** count of all index read operations (dynamo + cache) */
indexes = 0
/** count of indexes read from Cache */
indexesCached = 0
}
16 changes: 13 additions & 3 deletions src/s3/block-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,24 @@ import retry from 'p-retry'
export class DynamoIndex {
#client
#table
#metrics
#max
#preferRegion

/**
* @param {import('@aws-sdk/client-dynamodb').DynamoDBClient} client
* @param {string} table
* @param {import('../metrics.js').Metrics} metrics
* @param {object} [options]
* @param {number} [options.maxEntries] Max entries to return when multiple
* CAR files contain the same block.
* @param {string} [options.preferRegion] Preferred region to place first in
* results.
*/
constructor (client, table, options) {
constructor (client, table, metrics, options) {
this.#client = client
this.#table = table
this.#metrics = metrics
this.#max = options?.maxEntries ?? 5
this.#preferRegion = options?.preferRegion
}
Expand Down Expand Up @@ -60,6 +63,9 @@ export class DynamoIndex {
}
}
})
if (res.$metadata.httpStatusCode && res.$metadata.httpStatusCode >= 200 && res.$metadata.httpStatusCode < 300) {
this.#metrics.indexes++
}
const items = (res.Items ?? []).map(item => {
const { carpath, offset, length } = unmarshall(item)
const [region, bucket, ...rest] = carpath.split('/')
Expand All @@ -82,11 +88,13 @@ export class CachingIndex {
* @param {BlockIndex} index
* @param {Cache} cache
* @param {ExecutionContext} ctx
* @param {import('../metrics.js').Metrics} metrics
*/
constructor (index, cache, ctx) {
constructor (index, cache, ctx, metrics) {
this.index = index
this.cache = cache
this.ctx = ctx
this.metrics = metrics
}

/**
Expand All @@ -97,11 +105,13 @@ export class CachingIndex {
const key = this.toCacheKey(cid)
const cached = await this.cache.match(key)
if (cached) {
console.log('cached index', key.url)
this.metrics.indexes++
this.metrics.indexesCached++
return cached.json()
}
const res = await this.index.get(cid)
if (res.length > 0) {
this.metrics.indexes++
this.ctx.waitUntil(this.cache.put(key, new Response(JSON.stringify(res))))
}
return res
Expand Down
25 changes: 21 additions & 4 deletions src/worker.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
/* eslint-env serviceworker */
import toMultiaddr from '@multiformats/uri-to-multiaddr'
import { WebSockets } from 'cf-libp2p-ws-transport'
import { getLibp2p, getPeerId, getWebSocketListener } from './libp2p.js'
import { enableBitswap, getLibp2p, getPeerId, getWebSocketListener } from './libp2p.js'
import { getBlockstore } from './blocks.js'
import { version } from '../package.json'
import { Metrics } from './metrics.js'

/**
* @typedef {object} Env
Expand Down Expand Up @@ -36,10 +37,26 @@ export default {
try {
const upgrade = request.headers.get('Upgrade')
if (upgrade === 'websocket') {
const metrics = new Metrics()
const transport = new WebSockets()
const peerId = await getPeerId(env)
const bs = await getBlockstore(env, ctx)
await getLibp2p(env, bs, peerId, transport)
const bs = await getBlockstore(env, ctx, metrics)
const libp2p = await getLibp2p(env, transport)
libp2p.addEventListener('peer:connect', (evt) => {
const remotePeer = evt.detail
console.log(JSON.stringify({ msg: 'peer:connect', peer: remotePeer.toString() }))
})
libp2p.addEventListener('peer:disconnect', (evt) => {
const remotePeer = evt.detail
console.log(JSON.stringify({ msg: 'peer:disconnect', peer: remotePeer.toString(), ...metrics }))
})
const onError = async (/** @type {Error} */ err) => {
websocket?.close(418, err.message)
await libp2p.stop()
if (!err.message.startsWith('Too many subrequests')) {
throw err
}
}
enableBitswap(libp2p, bs, onError)
const listener = getWebSocketListener(env, transport)
const res = await listener.handleRequest(request)
// @ts-expect-error res will have a raw websocket server on it if worked.
Expand Down
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"moduleResolution": "node",
"skipLibCheck": true,
"resolveJsonModule": true,
"useUnknownInCatchVariables": true,
"lib": ["ES2022", "DOM"],
"target": "ES2022",
"module": "ES2022",
Expand Down

0 comments on commit 29b2fee

Please sign in to comment.