Skip to content

Commit

Permalink
feat: add bitswap progress events (#50)
Browse files Browse the repository at this point in the history
Upgrades all deps in order to update progress handler types to relay
progress messages from bitswap and the blockstore when fetching blocks.

Also adds tests for block storage.

Fixes #27
  • Loading branch information
achingbrain committed Mar 15, 2023
1 parent 7bcae3c commit 7460719
Show file tree
Hide file tree
Showing 16 changed files with 488 additions and 154 deletions.
8 changes: 4 additions & 4 deletions benchmarks/gc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"@libp2p/websockets": "^5.0.3",
"aegir": "^38.1.5",
"blockstore-datastore-adapter": "^5.0.0",
"datastore-core": "^8.0.4",
"datastore-fs": "^8.0.0",
"datastore-level": "^9.0.4",
"datastore-core": "^9.0.0",
"datastore-fs": "^9.0.0",
"datastore-level": "^10.0.1",
"execa": "^7.0.0",
"go-ipfs": "^0.18.1",
"helia": "~0.0.0",
Expand All @@ -29,7 +29,7 @@
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
"kubo-rpc-client": "^3.0.1",
"libp2p": "^0.42.2",
"libp2p": "next",
"multiformats": "^11.0.1",
"tinybench": "^2.4.0"
}
Expand Down
20 changes: 11 additions & 9 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,17 @@
"@libp2p/interface-libp2p": "^1.1.0",
"@libp2p/interfaces": "^3.3.1",
"@libp2p/logger": "^2.0.5",
"blockstore-core": "^3.0.0",
"blockstore-core": "^4.0.0",
"cborg": "^1.10.0",
"datastore-core": "^8.0.4",
"interface-blockstore": "^4.0.1",
"interface-datastore": "^7.0.3",
"interface-store": "^3.0.4",
"ipfs-bitswap": "^16.0.0",
"datastore-core": "^9.0.0",
"interface-blockstore": "^5.0.0",
"interface-datastore": "^8.0.0",
"interface-store": "^4.0.0",
"ipfs-bitswap": "^17.0.0",
"it-all": "^2.0.0",
"it-drain": "^2.0.0",
"it-filter": "^2.0.0",
"it-merge": "^2.0.0",
"it-pushable": "^3.1.2",
"it-foreach": "^1.0.1",
"mortice": "^3.0.1",
"multiformats": "^11.0.1",
"p-defer": "^4.0.0",
Expand All @@ -169,7 +168,10 @@
"@ipld/dag-json": "^10.0.1",
"@libp2p/websockets": "^5.0.3",
"aegir": "^38.1.0",
"libp2p": "^0.42.2"
"delay": "^5.0.0",
"libp2p": "next",
"sinon": "^15.0.2",
"sinon-ts": "^1.0.0"
},
"typedoc": {
"entryPoint": "./src/index.ts"
Expand Down
15 changes: 9 additions & 6 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ export class HeliaImpl implements Helia {
}
})

this.pins = new PinsImpl(datastore, blockstore, init.dagWalkers ?? [])

if (init.libp2p != null) {
this.#bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
Expand All @@ -79,21 +77,26 @@ export class HeliaImpl implements Helia {
})
}

this.pins = new PinsImpl(datastore, blockstore, init.dagWalkers ?? [])

this.libp2p = libp2p
this.blockstore = new BlockStorage(blockstore, this.pins, this.#bitswap)
this.blockstore = new BlockStorage(blockstore, this.pins, {
bitswap: this.#bitswap,
holdGcLock: init.holdGcLock
})
this.datastore = datastore
}

async start (): Promise<void> {
await assertDatastoreVersionIsCurrent(this.datastore)

this.#bitswap?.start()
await this.#bitswap?.start()
await this.libp2p.start()
}

async stop (): Promise<void> {
this.#bitswap?.stop()
await this.libp2p.stop()
await this.#bitswap?.stop()
}

async gc (options: GCOptions = {}): Promise<void> {
Expand All @@ -106,7 +109,7 @@ export class HeliaImpl implements Helia {
log('gc start')

await drain(blockstore.deleteMany((async function * () {
for await (const cid of blockstore.queryKeys({})) {
for await (const { cid } of blockstore.getAll()) {
try {
if (await helia.pins.isPinned(cid, options)) {
continue
Expand Down
19 changes: 18 additions & 1 deletion packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,26 @@ export interface HeliaInit {
dagWalkers?: DAGWalker[]

/**
* Pass `false` to not start the helia node
* Pass `false` to not start the Helia node
*/
start?: boolean

/**
* Garbage collection requires preventing blockstore writes during searches
* for unpinned blocks as DAGs are typically pinned after they've been
* imported - without locking this could lead to the deletion of blocks while
* they are being added to the blockstore.
*
* By default this lock is held on the main process (e.g. node cluster's
* primary process, the renderer thread in browsers) and other processes will
* contact the main process for access (worker processes in node cluster,
* webworkers in the browser).
*
* If Helia is being run wholly in a non-primary process, with no other process
* expected to access the blockstore (e.g. being run in the background in a
* webworker), pass true here to hold the gc lock in this process.
*/
holdGcLock?: boolean
}

/**
Expand Down
151 changes: 68 additions & 83 deletions packages/helia/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
import { BaseBlockstore } from 'blockstore-core'
import merge from 'it-merge'
import { pushable } from 'it-pushable'
import filter from 'it-filter'
import type { Blockstore, KeyQuery, Query } from 'interface-blockstore'
import type { Blockstore } from 'interface-blockstore'
import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents } from '@helia/interface/blocks'
import type { Bitswap } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { AbortOptions } from '@libp2p/interfaces'
import type { AwaitIterable } from 'interface-store'
import type { Mortice } from 'mortice'
import createMortice from 'mortice'
import type { Pins } from '@helia/interface/pins'
import forEach from 'it-foreach'
import { CustomProgressEvent, ProgressOptions } from 'progress-events'

export interface BlockStorageOptions extends AbortOptions {
export interface BlockStorageInit {
holdGcLock?: boolean
bitswap?: Bitswap
}

export interface GetOptions extends AbortOptions {
progress?: (evt: Event) => void
}

Expand All @@ -20,7 +25,7 @@ export interface BlockStorageOptions extends AbortOptions {
* blockstore (that may be on disk, s3, or something else). If the blocks are
* not present Bitswap will be used to fetch them from network peers.
*/
export class BlockStorage extends BaseBlockstore implements Blockstore {
export class BlockStorage implements Blocks {
public lock: Mortice
private readonly child: Blockstore
private readonly bitswap?: Bitswap
Expand All @@ -29,21 +34,13 @@ export class BlockStorage extends BaseBlockstore implements Blockstore {
/**
* Create a new BlockStorage
*/
constructor (blockstore: Blockstore, pins: Pins, bitswap?: Bitswap) {
super()

constructor (blockstore: Blockstore, pins: Pins, options: BlockStorageInit = {}) {
this.child = blockstore
this.bitswap = bitswap
this.bitswap = options.bitswap
this.pins = pins
this.lock = createMortice()
}

async open (): Promise<void> {
await this.child.open()
}

async close (): Promise<void> {
await this.child.close()
this.lock = createMortice({
singleProcess: options.holdGcLock
})
}

unwrap (): Blockstore {
Expand All @@ -53,15 +50,22 @@ export class BlockStorage extends BaseBlockstore implements Blockstore {
/**
* Put a block to the underlying datastore
*/
async put (cid: CID, block: Uint8Array, options: AbortOptions = {}): Promise<void> {
async put (cid: CID, block: Uint8Array, options: AbortOptions & ProgressOptions<PutBlockProgressEvents> = {}): Promise<void> {
const releaseLock = await this.lock.readLock()

try {
if (await this.child.has(cid)) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:put:duplicate', cid))
return
}

if (this.bitswap?.isStarted() === true) {
await this.bitswap.put(cid, block, options)
} else {
await this.child.put(cid, block, options)
options.onProgress?.(new CustomProgressEvent<CID>('blocks:put:bitswap:notify', cid))
this.bitswap.notify(cid, block, options)
}

options.onProgress?.(new CustomProgressEvent<CID>('blocks:put:blockstore:put', cid))
await this.child.put(cid, block, options)
} finally {
releaseLock()
}
Expand All @@ -70,17 +74,27 @@ export class BlockStorage extends BaseBlockstore implements Blockstore {
/**
* Put a multiple blocks to the underlying datastore
*/
async * putMany (blocks: AwaitIterable<{ key: CID, value: Uint8Array }>, options: AbortOptions = {}): AsyncGenerator<{ key: CID, value: Uint8Array }, void, undefined> {
async * putMany (blocks: AwaitIterable<{ cid: CID, block: Uint8Array }>, options: AbortOptions & ProgressOptions<PutManyBlocksProgressEvents> = {}): AsyncIterable<Pair> {
const releaseLock = await this.lock.readLock()

try {
const missingBlocks = filter(blocks, async ({ key }) => {
return !(await this.child.has(key))
const missingBlocks = filter(blocks, async ({ cid }) => {
const has = await this.child.has(cid)

if (has) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:put-many:duplicate', cid))
}

return !has
})

const store = this.bitswap?.isStarted() === true ? this.bitswap : this.child
const notifyEach = forEach(missingBlocks, ({ cid, block }) => {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:put-many:bitswap:notify', cid))
this.bitswap?.notify(cid, block, options)
})

yield * store.putMany(missingBlocks, options)
options.onProgress?.(new CustomProgressEvent('blocks:put-many:blockstore:put-many'))
yield * this.child.putMany(notifyEach, options)
} finally {
releaseLock()
}
Expand All @@ -89,54 +103,43 @@ export class BlockStorage extends BaseBlockstore implements Blockstore {
/**
* Get a block by cid
*/
async get (cid: CID, options: BlockStorageOptions = {}): Promise<Uint8Array> {
async get (cid: CID, options: AbortOptions & ProgressOptions<GetBlockProgressEvents> = {}): Promise<Uint8Array> {
const releaseLock = await this.lock.readLock()

try {
if (!(await this.has(cid)) && this.bitswap?.isStarted() === true) {
return await this.bitswap?.get(cid, options)
} else {
return await this.child.get(cid, options)
if (this.bitswap?.isStarted() != null && !(await this.child.has(cid))) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:bitswap:get', cid))
const block = await this.bitswap.want(cid, options)

options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:blockstore:put', cid))
await this.child.put(cid, block, options)

return block
}

options.onProgress?.(new CustomProgressEvent<CID>('blocks:get:blockstore:get', cid))
return await this.child.get(cid, options)
} finally {
releaseLock()
}
}

/**
* Get multiple blocks back from an array of cids
* Get multiple blocks back from an (async) iterable of cids
*/
async * getMany (cids: AwaitIterable<CID>, options: BlockStorageOptions = {}): AsyncGenerator<Uint8Array, void, undefined> {
async * getMany (cids: AwaitIterable<CID>, options: AbortOptions & ProgressOptions<GetManyBlocksProgressEvents> = {}): AsyncIterable<Uint8Array> {
const releaseLock = await this.lock.readLock()

try {
const getFromBitswap = pushable<CID>({ objectMode: true })
const getFromChild = pushable<CID>({ objectMode: true })

void Promise.resolve().then(async () => {
for await (const cid of cids) {
if (!(await this.has(cid)) && this.bitswap?.isStarted() === true) {
getFromBitswap.push(cid)
} else {
getFromChild.push(cid)
}
options.onProgress?.(new CustomProgressEvent('blocks:get-many:blockstore:get-many'))
yield * this.child.getMany(forEach(cids, async (cid) => {
if (this.bitswap?.isStarted() === true && !(await this.child.has(cid))) {
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get-many:bitswap:get', cid))
const block = await this.bitswap.want(cid, options)
options.onProgress?.(new CustomProgressEvent<CID>('blocks:get-many:blockstore:put', cid))
await this.child.put(cid, block, options)
}

getFromBitswap.end()
getFromChild.end()
}).catch(err => {
getFromBitswap.throw(err)
})

const streams = [
this.child.getMany(getFromChild, options)
]

if (this.bitswap?.isStarted() === true) {
streams.push(this.bitswap.getMany(getFromBitswap, options))
}

yield * merge(...streams)
}))
} finally {
releaseLock()
}
Expand All @@ -145,14 +148,15 @@ export class BlockStorage extends BaseBlockstore implements Blockstore {
/**
* Delete a block from the blockstore
*/
async delete (cid: CID, options: AbortOptions = {}): Promise<void> {
async delete (cid: CID, options: AbortOptions & ProgressOptions<DeleteBlockProgressEvents> = {}): Promise<void> {
const releaseLock = await this.lock.writeLock()

try {
if (await this.pins.isPinned(cid)) {
throw new Error('CID was pinned')
}

options.onProgress?.(new CustomProgressEvent<CID>('blocks:delete:blockstore:delete', cid))
await this.child.delete(cid, options)
} finally {
releaseLock()
Expand All @@ -162,12 +166,13 @@ export class BlockStorage extends BaseBlockstore implements Blockstore {
/**
* Delete multiple blocks from the blockstore
*/
async * deleteMany (cids: AwaitIterable<CID>, options: AbortOptions = {}): AsyncGenerator<CID, void, undefined> {
async * deleteMany (cids: AwaitIterable<CID>, options: AbortOptions & ProgressOptions<DeleteManyBlocksProgressEvents> = {}): AsyncIterable<CID> {
const releaseLock = await this.lock.writeLock()

try {
const storage = this

options.onProgress?.(new CustomProgressEvent('blocks:delete-many:blockstore:delete-many'))
yield * this.child.deleteMany((async function * () {
for await (const cid of cids) {
if (await storage.pins.isPinned(cid)) {
Expand All @@ -191,24 +196,4 @@ export class BlockStorage extends BaseBlockstore implements Blockstore {
releaseLock()
}
}

async * query (q: Query, options: AbortOptions = {}): AsyncGenerator<{ key: CID, value: Uint8Array }, void, undefined> {
const releaseLock = await this.lock.readLock()

try {
yield * this.child.query(q, options)
} finally {
releaseLock()
}
}

async * queryKeys (q: KeyQuery, options: AbortOptions = {}): AsyncGenerator<CID, void, undefined> {
const releaseLock = await this.lock.readLock()

try {
yield * this.child.queryKeys(q, options)
} finally {
releaseLock()
}
}
}
Loading

0 comments on commit 7460719

Please sign in to comment.