Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose configured dag walkers and hashers on helia interface #381

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions packages/block-brokers/src/bitswap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { ProgressOptions } from 'progress-events'
interface BitswapComponents {
libp2p: Libp2p
blockstore: Blockstore
hashers: MultihashHasher[]
hashers: Record<string, MultihashHasher>
}

export interface BitswapInit extends BitswapOptions {
Expand All @@ -29,9 +29,15 @@ ProgressOptions<BitswapWantBlockProgressEvents>
this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})
let hasher: MultihashHasher | undefined

if (typeof codecOrName === 'string') {
hasher = Object.values(hashers).find(hasher => {
return hasher.name === codecOrName
})
} else {
hasher = hashers[codecOrName]
}

if (hasher != null) {
return hasher
Expand Down
3 changes: 1 addition & 2 deletions packages/car/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@
"dependencies": {
"@helia/interface": "^3.0.1",
"@ipld/car": "^5.1.1",
"@ipld/dag-pb": "^4.0.6",
"@libp2p/interfaces": "^3.3.1",
"cborg": "^4.0.3",
"it-drain": "^3.0.5",
"it-map": "^3.0.3",
"multiformats": "^13.0.0",
Expand All @@ -153,6 +151,7 @@
},
"devDependencies": {
"@helia/unixfs": "^2.0.1",
"@ipld/dag-pb": "^4.0.8",
"aegir": "^42.1.0",
"blockstore-core": "^4.3.10",
"interface-blockstore": "^5.2.9",
Expand Down
42 changes: 6 additions & 36 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import map from 'it-map'
import defer from 'p-defer'
import PQueue from 'p-queue'
import { cborWalker, dagPbWalker, jsonWalker, rawWalker } from './utils/dag-walkers.js'
import type { DAGWalker } from '@helia/interface'
import type { Blocks, GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader, CarWriter } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interfaces'
Expand All @@ -71,23 +71,7 @@

export interface CarComponents {
blockstore: Blocks
}

export interface CarInit {
/**
* In order to export CIDs that correspond to a DAG, it's necessary to know
* how to traverse that DAG. DAGWalkers take a block and yield any CIDs
* encoded within that block.
*/
dagWalkers?: DAGWalker[]
}

/**
* DAGWalkers take a block and yield CIDs encoded in that block
*/
export interface DAGWalker {
codec: number
walk(block: Uint8Array): AsyncGenerator<CID, void, undefined>
dagWalkers: Record<number, DAGWalker>
}

/**
Expand Down Expand Up @@ -146,27 +130,13 @@
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void>
}

const DEFAULT_DAG_WALKERS = [
rawWalker,
dagPbWalker,
cborWalker,
jsonWalker
]

const DAG_WALK_QUEUE_CONCURRENCY = 1

class DefaultCar implements Car {
private readonly components: CarComponents
private dagWalkers: Record<number, DAGWalker>

constructor (components: CarComponents, init: CarInit) {
constructor (components: CarComponents, init: any) {
this.components = components

this.dagWalkers = {}

;[...DEFAULT_DAG_WALKERS, ...(init.dagWalkers ?? [])].forEach(dagWalker => {
this.dagWalkers[dagWalker.codec] = dagWalker
})
}

async import (reader: Pick<CarReader, 'blocks'>, options?: AbortOptions & ProgressOptions<PutManyBlocksProgressEvents>): Promise<void> {
Expand All @@ -188,7 +158,7 @@
deferred.resolve()
})
queue.on('error', (err) => {
deferred.resolve(err)
deferred.reject(err)

Check warning on line 161 in packages/car/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/car/src/index.ts#L161

Added line #L161 was not covered by tests
})

for (const root of roots) {
Expand All @@ -212,7 +182,7 @@
* and update the pin count for them
*/
async #walkDag (cid: CID, queue: PQueue, withBlock: (cid: CID, block: Uint8Array) => Promise<void>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
const dagWalker = this.dagWalkers[cid.code]
const dagWalker = this.components.dagWalkers[cid.code]

if (dagWalker == null) {
throw new Error(`No dag walker found for cid codec ${cid.code}`)
Expand All @@ -234,6 +204,6 @@
/**
* Create a {@link Car} instance for use with {@link https://github.com/ipfs/helia Helia}
*/
export function car (helia: { blockstore: Blocks }, init: CarInit = {}): Car {
export function car (helia: { blockstore: Blocks, dagWalkers: Record<number, DAGWalker> }, init: any = {}): Car {
return new DefaultCar(helia, init)
}
176 changes: 0 additions & 176 deletions packages/car/src/utils/dag-walkers.ts

This file was deleted.

37 changes: 32 additions & 5 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,58 @@

import { type UnixFS, unixfs } from '@helia/unixfs'
import { CarReader } from '@ipld/car'
import * as dagPb from '@ipld/dag-pb'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import * as raw from 'multiformats/codecs/raw'
import { car, type Car } from '../src/index.js'
import { largeFile, smallFile } from './fixtures/files.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { DAGWalker } from '@helia/interface'
import type { Blockstore } from 'interface-blockstore'

/**
* Dag walker for dag-pb CIDs
*/
const dagPbWalker: DAGWalker = {
codec: dagPb.code,
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

const rawWalker: DAGWalker = {
codec: raw.code,
* walk () {
// no embedded CIDs in a raw block
}
}

describe('import', () => {
let blockstore: Blockstore
let c: Car
let u: UnixFS
let dagWalkers: Record<number, DAGWalker>

beforeEach(async () => {
blockstore = new MemoryBlockstore()
dagWalkers = {
[dagPb.code]: dagPbWalker,
[raw.code]: rawWalker
}

c = car({ blockstore })
c = car({ blockstore, dagWalkers })
u = unixfs({ blockstore })
})

it('exports and imports a car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid = await otherUnixFS.addBytes(smallFile)

const writer = memoryCarWriter(cid)
Expand All @@ -46,7 +73,7 @@ describe('import', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid1 = await otherUnixFS.addBytes(fileData1)
const cid2 = await otherUnixFS.addBytes(fileData2)
const cid3 = await otherUnixFS.addBytes(fileData3)
Expand All @@ -66,7 +93,7 @@ describe('import', () => {
it('exports and imports a multiple block car file', async () => {
const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid = await otherUnixFS.addBytes(largeFile, {
chunker: fixedSize({
chunkSize: 1024
Expand All @@ -90,7 +117,7 @@ describe('import', () => {

const otherBlockstore = new MemoryBlockstore()
const otherUnixFS = unixfs({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore })
const otherCar = car({ blockstore: otherBlockstore, dagWalkers })
const cid1 = await otherUnixFS.addBytes(fileData1, {
chunker: fixedSize({
chunkSize: 2
Expand Down
Loading