Skip to content

Commit

Permalink
feat: calculate car cid and size during validation (#137)
Browse files Browse the repository at this point in the history
Update car key to `${carCid}/${carCid}.car`, so we can start writing
them to carpark.

Calculate the sha256 of the car bytes from the stream as we produce the
linkdex report to determine if the car is complete / valid

Log total bytes per CAR as `carSize`. Use this to calculate the
throughput in loki. Needed as we wont be able to use the size of the
bucket once we start writing to the shared carpark.

Fixes #136

TODO
- [ ] update the production and staging env to write to carpark bucket.

License: MIT

---------

Signed-off-by: Oli Evans <oli@protocol.ai>
  • Loading branch information
olizilla committed Aug 16, 2023
1 parent 551824e commit f4a2880
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 30 deletions.
37 changes: 37 additions & 0 deletions pickup/lib/car.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import crypto from 'node:crypto'
import { compose } from 'node:stream'
import { CarBlockIterator } from '@ipld/car'
import { LinkIndexer } from 'linkdex'
import { CID } from 'multiformats/cid'
import * as Digest from 'multiformats/hashes/digest'
import { sha256 } from 'multiformats/hashes/sha2'

const CAR_CODEC = 0x0202

/**
* @param {AsyncIterable<Uint8Array>} car
Expand All @@ -15,3 +22,33 @@ export async function linkdex (car) {

return linkIndexer.report()
}

/**
* @param {import('node:crypto').Hash} hash
*/
export function createCarCid (hash) {
const digest = Digest.create(sha256.code, hash.digest())
return CID.createV1(CAR_CODEC, digest)
}

/**
* Stream the bytes of a CAR to:
* - find the total size in bytes
* - calculate the CAR CID
* - create a linkdex report to check the dag is complete
*
* @param {AsyncIterable<Uint8Array>} car
*/
export async function checkCar (car) {
let carSize = 0
const sha256 = crypto.createHash('sha256')
const report = await linkdex(compose(car, async function * (source) {
for await (const chunk of source) {
sha256.update(chunk)
carSize += chunk.byteLength
yield chunk
}
}))
const carCid = createCarCid(sha256)
return { carCid, carSize, report }
}
4 changes: 2 additions & 2 deletions pickup/lib/pickup.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ export function createPickup ({ sqsPoller, carFetcher, s3Uploader, pinTable }) {
logger.info({ cid, origins }, 'Fetching CAR')
await carFetcher.connectTo(origins)
const body = await carFetcher.fetch({ cid, origins, abortCtl })
await upload(body)
logger.info({ cid, origins }, 'OK. Car in S3')
const { carCid, carSize } = await upload(body)
logger.info({ cid, origins, carCid, carSize }, 'OK. Car in S3')
await msg.del() // the message is handled, remove it from queue.
} catch (err) {
if (abortCtl.signal.reason === TOO_BIG) {
Expand Down
47 changes: 19 additions & 28 deletions pickup/lib/s3.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { S3Client, GetObjectCommand, CopyObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import retry from 'p-retry'
import { linkdex } from './car.js'
import { checkCar } from './car.js'
import { logger } from './logger.js'

/**
Expand Down Expand Up @@ -30,38 +30,21 @@ export async function uploadAndVerify ({ client, validationBucket, destinationBu

await s3Upload.done()

await checkCar({ client, bucket: validationBucket, key, cid })

return retry(() => client.send(new CopyObjectCommand({
CopySource: `${validationBucket}/${key}`,
Bucket: destinationBucket,
const res = await retry(() => client.send(new GetObjectCommand({
Bucket: validationBucket,
Key: key
})), { retries: 5, onFailedAttempt: (err) => logger.info({ err, cid }, 'Copy to destination failed, retrying') })
}
})), { retries: 5, onFailedAttempt: (err) => logger.info({ err, cid }, 'Get car from s3 failed, retrying') })

/**
* Fetch the car and stream it through linkdex to check if it's complete
*
* @param {object} config
* @param {S3Client} config.client
* @param {string} config.bucket
* @param {string} config.key
* @param {string} config.cid
*/
export async function checkCar ({ client, bucket, key, cid }) {
let report
let check
try {
report = await retry(async () => {
const res = await client.send(new GetObjectCommand({
Bucket: bucket,
Key: key
}))
return linkdex(res.Body)
}, { retries: 3, onFailedAttempt: (err) => logger.info({ err, cid }, 'checkCar failed, retrying') })
} catch (cause) {
throw new Error('checkCar failed', { cause })
check = await checkCar(res.Body)
} catch (err) {
logger.info({ err, cid }, 'checkCar failed')
throw new Error('checkCar failed', { cause: err })
}

const { carCid, carSize, report } = check

if (report.blocksIndexed === 0) {
logger.info({ report, cid }, 'linkdex: Empty CAR')
throw new Error('Empty CAR')
Expand All @@ -71,6 +54,14 @@ export async function checkCar ({ client, bucket, key, cid }) {
logger.info({ report, cid }, 'linkdex: DAG not complete')
throw new Error('DAG not complete')
}

await retry(() => client.send(new CopyObjectCommand({
CopySource: `${validationBucket}/${key}`,
Bucket: destinationBucket,
Key: `${carCid}/${carCid}.car`
})), { retries: 5, onFailedAttempt: (err) => logger.info({ err, cid }, 'Copy to destination failed, retrying') })

return { carCid, carSize, cid }
}

export class S3Uploader {
Expand Down
14 changes: 14 additions & 0 deletions pickup/test/car.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { packToBlob } from 'ipfs-car/pack/blob'
import { checkCar } from '../lib/car.js'
import test from 'ava'

test('checkCar', async t => {
const { car } = await packToBlob({ input: 'hello world', wrapWithDirectory: false })
const { carCid, carSize, report } = await checkCar(car.stream())
t.is(carCid.toString(), 'bagbaierao5e6fdcp4p3iyafmbcxudqoe63qcoxegxwpivz5zirw2pulgg4ia')
t.is(report.blocksIndexed, 1)
t.is(report.undecodeable, 0)
t.is(report.uniqueCids, 1)
t.is(report.structure, 'Complete')
t.is(carSize, 107)
})

0 comments on commit f4a2880

Please sign in to comment.