Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: read() w/ tests
  • Loading branch information
mikeal committed Mar 20, 2020
1 parent 858fb14 commit aa30091
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 10 deletions.
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -12,6 +12,7 @@
"license": "(Apache-2.0 AND MIT)",
"dependencies": {
"@ipld/block": "^2.1.4",
"hundreds": "0.0.2",
"ipld-schema": "^0.3.2",
"ipld-schema-validation": "^0.3.0"
},
Expand Down
35 changes: 29 additions & 6 deletions src/bare.js
@@ -1,9 +1,12 @@
const validate = require('ipld-schema-validation')(require('./schema.json'))

const cidSymbol = Symbol.for('@ipld/js-cid/CID')
const isCID = node => !!(node && node[cidSymbol])

const sum = (x, y) => x + y

module.exports = (Block, codec) => {
const balanced = async function * (parts, opts) {
const balanced = (opts={}) => async function * (parts) {
parts = [...parts]
const limit = opts.limit || 1000
if (parts.length > limit) {
Expand All @@ -13,7 +16,7 @@ module.exports = (Block, codec) => {
const chunk = parts.splice(0, size)
const length = chunk.map(([l]) => l).reduce(sum)
let last
for await (const block of balanced(chunk, opts)) {
for await (const block of balanced(opts)(chunk)) {
yield block
last = block
}
Expand All @@ -24,20 +27,40 @@ module.exports = (Block, codec) => {
validate(parts, 'FlexibleByteLayout')
yield Block.encoder(parts, 'dag-cbor')
}
const fromGenerator = async function * (gen, algo = balanced, opts = {}) {
const parts = []
const _balanced = balanced()
const fromGenerator = async function * (gen, algo = _balanced) {
const parts = []
for await (const buffer of gen) {
const block = Block.encoder(buffer, 'raw')
yield block
parts.push([buffer.length, await block.cid()])
}
yield * algo(parts, opts)
yield * algo(parts)
}
const size = block => {
const data = Block.isBlock(block) ? block.decode() : block
validate(data, 'FlexibleByteLayout')
if (Buffer.isBuffer(data)) return data.length
return data.map(([l]) => l).reduce(sum)
}
return { from: fromGenerator, balanced, size }
const read = async function * (block, get, offset = 0, end = Infinity) {
if (isCID(block)) block = await get(block)
const decoded = Block.isBlock(block) ? block.decodeUnsafe() : block
if (Buffer.isBuffer(decoded)) {
yield decoded.subarray(offset, end)
return
}

validate(decoded, 'FlexibleByteLayout')

let i = 0
for (const [length, link] of decoded) {
if (i > end) return

yield * read(link, get, offset > i ? offset - i : 0, end - i)

i += length
}
}
return { from: fromGenerator, balanced, size, read }
}
68 changes: 64 additions & 4 deletions test/test-basics.js
Expand Up @@ -3,13 +3,16 @@ const main = require('../')
const test = it
const assert = require('assert')
const same = assert.deepStrictEqual
const { promisify } = require('util')
const randomBytes = promisify(require('crypto').randomBytes)
const Block = require('@ipld/block')

const chunk = Buffer.alloc(1024, '\n')

const mkgen = async function * (length) {
const mkgen = async function * (length, _chunk = chunk) {
let i = 0
while (i < length) {
yield chunk
yield _chunk
i++
}
}
Expand All @@ -23,7 +26,8 @@ test('basic inline buffer', async () => {
const testMany = async (i, limit) => {
const blocks = { 'dag-cbor': [], raw: [] }
let last
for await (const block of main.from(mkgen(i, { limit }))) {
const balanced = main.balanced({limit})
for await (const block of main.from(mkgen(i), balanced)) {
const cid = await block.cid()
blocks[cid.codec].push(block)
last = block
Expand All @@ -41,6 +45,62 @@ test('basic stream', async () => {

test('nested stream', async () => {
const [root, blocks] = await testMany(500, 100)
same(blocks.length, 1)
same(blocks.length, 6)
same(main.size(root), 500 * 1024)
})

const toBuffer = async gen => {
const buffers = []
for await (const buffer of gen) {
buffers.push(buffer)
}
return Buffer.concat(buffers)
}

const load = async gen => {
const store = new Map()
let last
for await (const block of gen) {
const cid = await block.cid()
store.set(cid.toString(), block)
last = block
}
const get = cid => new Promise(resolve => resolve(store.get(cid.toString())))
return [get, last]
}

const read = async (...args) => toBuffer(main.read(...args))

test('read inline', async () => {
same(await read(chunk), chunk)
same(await read(Block.encoder(chunk, 'raw')), chunk)
})

test('read nested full', async () => {
const balanced = main.balanced({limit: 100})
const [get, root] = await load(main.from(mkgen(500), balanced))
const data = await read(root, get)
const comp = await toBuffer(mkgen(500))
same(data.length, comp.length)
assert.ok(!Buffer.compare(data, comp))
})

test('read nested sliding', async () => {
const buffer = await randomBytes(1024)
const balanced = main.balanced({limit: 2})
const [get, root] = await load(main.from(mkgen(10, buffer), balanced))
const data = await read(root, get)
const comp = await toBuffer(mkgen(10, buffer))
same(data.length, comp.length)
assert.ok(!Buffer.compare(data, comp))

const length = 10 * 1024
let start = 0
let end = 40
while (end <= length) {
const data = await read(root, get, start, end)
Buffer.compare(data, comp.subarray(start, end))
start += 1
end += 2
}
})

0 comments on commit aa30091

Please sign in to comment.