Skip to content

Commit

Permalink
feat: expose progress events from importer, blockstore and bitswap (#13)
Browse files Browse the repository at this point in the history
Expose progress event types that are passed to `onProgress` callback passed to all operations.
  • Loading branch information
achingbrain committed Mar 15, 2023
1 parent 4c8d124 commit de78f4d
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 31 deletions.
3 changes: 2 additions & 1 deletion packages/interop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@
"go-ipfs": "^0.18.1",
"helia": "next",
"ipfs-core-types": "^0.14.0",
"ipfs-unixfs-importer": "^15.0.0",
"ipfs-unixfs-importer": "^15.0.1",
"ipfsd-ctl": "^13.0.0",
"it-to-buffer": "^3.0.1",
"kubo-rpc-client": "^3.0.0",
"libp2p": "next",
"merge-options": "^3.0.4",
Expand Down
82 changes: 82 additions & 0 deletions packages/interop/test/bitswap.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import { createHeliaNode } from './fixtures/create-helia.js'
import { createKuboNode } from './fixtures/create-kubo.js'
import type { Helia } from '@helia/interface'
import type { Controller } from 'ipfsd-ctl'
import { UnixFS, unixfs } from '@helia/unixfs'
import type { FileCandidate } from 'ipfs-unixfs-importer'
import toBuffer from 'it-to-buffer'

describe('unixfs bitswap interop', () => {
let helia: Helia
let unixFs: UnixFS
let kubo: Controller

beforeEach(async () => {
helia = await createHeliaNode()
unixFs = unixfs(helia)
kubo = await createKuboNode()

// connect helia to kubo
await helia.libp2p.peerStore.addressBook.add(kubo.peer.id, kubo.peer.addresses)
await helia.libp2p.dial(kubo.peer.id)
})

afterEach(async () => {
if (helia != null) {
await helia.stop()
}

if (kubo != null) {
await kubo.stop()
}
})

it('should add a large file to helia and fetch it from kubo', async () => {
const chunkSize = 1024 * 1024
const size = chunkSize * 10
const input: Uint8Array[] = []

const candidate: FileCandidate = {
content: (async function * () {
for (let i = 0; i < size; i += chunkSize) {
const buf = new Uint8Array(chunkSize)
input.push(buf)

yield buf
}
}())
}

const cid = await unixFs.addFile(candidate)

const bytes = await toBuffer(kubo.api.cat(cid))

expect(bytes).to.equalBytes(await toBuffer(input))
})

it('should add a large file to kubo and fetch it from helia', async () => {
const chunkSize = 1024 * 1024
const size = chunkSize * 10
const input: Uint8Array[] = []

const candidate: FileCandidate = {
content: (async function * () {
for (let i = 0; i < size; i += chunkSize) {
const buf = new Uint8Array(chunkSize)
input.push(buf)

yield buf
}
}())
}

const { cid } = await kubo.api.add(candidate.content)

const bytes = await toBuffer(unixFs.cat(cid))

expect(bytes).to.equalBytes(await toBuffer(input))
})
})
12 changes: 6 additions & 6 deletions packages/interop/test/files.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@ import { createHeliaNode } from './fixtures/create-helia.js'
import { createKuboNode } from './fixtures/create-kubo.js'
import type { Helia } from '@helia/interface'
import type { Controller } from 'ipfsd-ctl'
import { UnixFS, unixfs } from '@helia/unixfs'
import { AddOptions, UnixFS, unixfs } from '@helia/unixfs'
import { balanced } from 'ipfs-unixfs-importer/layout'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import type { FileCandidate, ImporterOptions } from 'ipfs-unixfs-importer'
import type { FileCandidate } from 'ipfs-unixfs-importer'
import type { CID } from 'multiformats/cid'
import type { AddOptions } from 'ipfs-core-types/src/root.js'
import type { AddOptions as KuboAddOptions } from 'ipfs-core-types/src/root.js'

describe('unixfs interop', () => {
let helia: Helia
let unixFs: UnixFS
let kubo: Controller

async function importToHelia (data: FileCandidate, opts?: Partial<ImporterOptions>): Promise<CID> {
async function importToHelia (data: FileCandidate, opts?: Partial<AddOptions>): Promise<CID> {
const cid = await unixFs.addFile(data, opts)

return cid
}

async function importToKubo (data: FileCandidate, opts?: AddOptions): Promise<CID> {
async function importToKubo (data: FileCandidate, opts?: KuboAddOptions): Promise<CID> {
const result = await kubo.api.add(data.content, opts)

return result.cid
}

async function expectSameCid (data: () => FileCandidate, heliaOpts: Partial<ImporterOptions> = {}, kuboOpts: AddOptions = {}): Promise<void> {
async function expectSameCid (data: () => FileCandidate, heliaOpts: Partial<AddOptions> = {}, kuboOpts: KuboAddOptions = {}): Promise<void> {
const heliaCid = await importToHelia(data(), {
// these are the default kubo options
cidVersion: 0,
Expand Down
8 changes: 6 additions & 2 deletions packages/unixfs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@
"hamt-sharding": "^3.0.2",
"interface-blockstore": "^5.0.0",
"ipfs-unixfs": "^11.0.0",
"ipfs-unixfs-exporter": "^13.0.0",
"ipfs-unixfs-importer": "^15.0.0",
"ipfs-unixfs-exporter": "^13.0.1",
"ipfs-unixfs-importer": "^15.0.1",
"it-last": "^2.0.0",
"it-pipe": "^2.0.5",
"merge-options": "^3.0.4",
"multiformats": "^11.0.1",
"progress-events": "^1.0.0",
"sparse-array": "^1.3.2"
},
"devDependencies": {
Expand All @@ -164,5 +165,8 @@
"it-first": "^2.0.0",
"it-to-buffer": "^3.0.0",
"uint8arrays": "^4.0.3"
},
"typedoc": {
"entryPoint": "./src/index.ts"
}
}
3 changes: 2 additions & 1 deletion packages/unixfs/src/commands/chmod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export async function chmod (cid: CID, mode: number, blockstore: Blocks, options
// but do not reimport files, only manipulate dag-pb nodes
const root = await pipe(
async function * () {
for await (const entry of recursive(resolved.cid, blockstore)) {
for await (const entry of recursive(resolved.cid, blockstore, options)) {
let metadata: UnixFS
let links: PBLink[] = []

Expand All @@ -63,6 +63,7 @@ export async function chmod (cid: CID, mode: number, blockstore: Blocks, options
}
}
},
// @ts-expect-error cannot combine progress types
(source) => importer(source, blockstore, {
...opts,
dagBuilder: async function * (source, block) {
Expand Down
50 changes: 29 additions & 21 deletions packages/unixfs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
*/

import type { CID, Version } from 'multiformats/cid'
import type { Blocks } from '@helia/interface/blocks'
import type { Blocks, GetBlockProgressEvents, PutBlockProgressEvents } from '@helia/interface/blocks'
import type { AbortOptions } from '@libp2p/interfaces'
import { addAll, addBytes, addByteStream, addDirectory, addFile } from './commands/add.js'
import { cat } from './commands/cat.js'
Expand All @@ -45,16 +45,24 @@ import { touch } from './commands/touch.js'
import { chmod } from './commands/chmod.js'
import type { UnixFSEntry } from 'ipfs-unixfs-exporter'
import { ls } from './commands/ls.js'
import type { ByteStream, DirectoryCandidate, FileCandidate, ImportCandidateStream, ImporterOptions, ImportResult } from 'ipfs-unixfs-importer'
import type { ByteStream, DirectoryCandidate, FileCandidate, ImportCandidateStream, ImporterOptions, ImportProgressEvents, ImportResult } from 'ipfs-unixfs-importer'
import type { ProgressOptions } from 'progress-events'

export interface UnixFSComponents {
blockstore: Blocks
}

export type AddEvents = PutBlockProgressEvents
| ImportProgressEvents

export interface AddOptions extends AbortOptions, Omit<ImporterOptions, 'onProgress'>, ProgressOptions<AddEvents> {

}

/**
* Options to pass to the cat command
*/
export interface CatOptions extends AbortOptions {
export interface CatOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* Start reading the file at this offset
*/
Expand All @@ -74,7 +82,7 @@ export interface CatOptions extends AbortOptions {
/**
* Options to pass to the chmod command
*/
export interface ChmodOptions extends AbortOptions {
export interface ChmodOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* If the target of the operation is a directory and this is true,
* apply the new mode to all directory contents
Expand All @@ -96,7 +104,7 @@ export interface ChmodOptions extends AbortOptions {
/**
* Options to pass to the cp command
*/
export interface CpOptions extends AbortOptions {
export interface CpOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* If true, allow overwriting existing directory entries (default: false)
*/
Expand All @@ -112,7 +120,7 @@ export interface CpOptions extends AbortOptions {
/**
* Options to pass to the ls command
*/
export interface LsOptions extends AbortOptions {
export interface LsOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* Optional path to list subdirectory contents if the target CID resolves to
* a directory
Expand All @@ -133,7 +141,7 @@ export interface LsOptions extends AbortOptions {
/**
* Options to pass to the mkdir command
*/
export interface MkdirOptions extends AbortOptions {
export interface MkdirOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* The CID version to create the new directory with - defaults to the same
* version as the containing directory
Expand Down Expand Up @@ -165,7 +173,7 @@ export interface MkdirOptions extends AbortOptions {
/**
* Options to pass to the rm command
*/
export interface RmOptions extends AbortOptions {
export interface RmOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents | PutBlockProgressEvents> {
/**
* DAGs with a root block larger than this value will be sharded. Blocks
* smaller than this value will be regular UnixFS directories.
Expand All @@ -176,7 +184,7 @@ export interface RmOptions extends AbortOptions {
/**
* Options to pass to the stat command
*/
export interface StatOptions extends AbortOptions {
export interface StatOptions extends AbortOptions, ProgressOptions<GetBlockProgressEvents> {
/**
* An optional path to allow statting paths inside directories
*/
Expand Down Expand Up @@ -292,7 +300,7 @@ export interface UnixFS {
* }
* ```
*/
addAll: (source: ImportCandidateStream, options?: Partial<ImporterOptions>) => AsyncIterable<ImportResult>
addAll: (source: ImportCandidateStream, options?: Partial<AddOptions>) => AsyncIterable<ImportResult>

/**
* Add a single `Uint8Array` to your Helia node as a file.
Expand All @@ -305,7 +313,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addBytes: (bytes: Uint8Array, options?: Partial<ImporterOptions>) => Promise<CID>
addBytes: (bytes: Uint8Array, options?: Partial<AddOptions>) => Promise<CID>

/**
* Add a stream of `Uint8Array` to your Helia node as a file.
Expand All @@ -321,7 +329,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addByteStream: (bytes: ByteStream, options?: Partial<ImporterOptions>) => Promise<CID>
addByteStream: (bytes: ByteStream, options?: Partial<AddOptions>) => Promise<CID>

/**
* Add a file to your Helia node with optional metadata.
Expand All @@ -342,7 +350,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addFile: (file: FileCandidate, options?: Partial<ImporterOptions>) => Promise<CID>
addFile: (file: FileCandidate, options?: Partial<AddOptions>) => Promise<CID>

/**
* Add a directory to your Helia node.
Expand All @@ -355,7 +363,7 @@ export interface UnixFS {
* console.info(cid)
* ```
*/
addDirectory: (dir?: Partial<DirectoryCandidate>, options?: Partial<ImporterOptions>) => Promise<CID>
addDirectory: (dir?: Partial<DirectoryCandidate>, options?: Partial<AddOptions>) => Promise<CID>

/**
* Retrieve the contents of a file from your Helia node.
Expand All @@ -368,7 +376,7 @@ export interface UnixFS {
* }
* ```
*/
cat: (cid: CID, options?: Partial<CatOptions>) => AsyncIterable<Uint8Array>
cat: (cid: CID, options?: Partial<CatOptions> & ProgressOptions<GetBlockProgressEvents>) => AsyncIterable<Uint8Array>

/**
* Change the permissions on a file or directory in a DAG
Expand Down Expand Up @@ -415,7 +423,7 @@ export interface UnixFS {
* }
* ```
*/
ls: (cid: CID, options?: Partial<LsOptions>) => AsyncIterable<UnixFSEntry>
ls: (cid: CID, options?: Partial<LsOptions> & ProgressOptions<GetBlockProgressEvents>) => AsyncIterable<UnixFSEntry>

/**
* Make a new directory under an existing directory.
Expand Down Expand Up @@ -489,23 +497,23 @@ class DefaultUnixFS implements UnixFS {
this.components = components
}

async * addAll (source: ImportCandidateStream, options: Partial<ImporterOptions> = {}): AsyncIterable<ImportResult> {
async * addAll (source: ImportCandidateStream, options: Partial<AddOptions> = {}): AsyncIterable<ImportResult> {
yield * addAll(source, this.components.blockstore, options)
}

async addBytes (bytes: Uint8Array, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addBytes (bytes: Uint8Array, options: Partial<AddOptions> = {}): Promise<CID> {
return await addBytes(bytes, this.components.blockstore, options)
}

async addByteStream (bytes: ByteStream, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addByteStream (bytes: ByteStream, options: Partial<AddOptions> = {}): Promise<CID> {
return await addByteStream(bytes, this.components.blockstore, options)
}

async addFile (file: FileCandidate, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addFile (file: FileCandidate, options: Partial<AddOptions> = {}): Promise<CID> {
return await addFile(file, this.components.blockstore, options)
}

async addDirectory (dir: Partial<DirectoryCandidate> = {}, options: Partial<ImporterOptions> = {}): Promise<CID> {
async addDirectory (dir: Partial<DirectoryCandidate> = {}, options: Partial<AddOptions> = {}): Promise<CID> {
return await addDirectory(dir, this.components.blockstore, options)
}

Expand Down

0 comments on commit de78f4d

Please sign in to comment.