Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
generic balanced reducer
Browse files Browse the repository at this point in the history
  • Loading branch information
pgte committed Dec 22, 2016
1 parent 6252850 commit 12a3bf5
Showing 1 changed file with 87 additions and 0 deletions.
87 changes: 87 additions & 0 deletions src/importer/builder/balanced/reducer.js
@@ -0,0 +1,87 @@
'use strict'

const pull = require('pull-stream')
const pullWrite = require('pull-write')
const pushable = require('pull-pushable')
const batch = require('pull-batch')
const through = require('pull-through')

module.exports = function balancedReduceToRoot(specialization, options) {

const deliverRoot = pushable()
const refeed = pushable()

const sink = pullWrite(
function (items, cb) { // write
refeed.push(items)
// items.forEach(item => refeed.push(item))
cb()
},
null, // reduce
1, // max
function (err) {
refeed.end()
}
)


let root

pull(
refeed,
batch(options.maxChildrenPerNode),
through(
function(items) {
if (items.length > 1) {
specialization.reduce(items, function(err, parent) {
if (err) {
this.queue(err)
return
}

root = parent
refeed.push(root)
})
} else {
this.queue(items[0])
}
},
function (end) {
if (root) {
this.queue(root)
}
this.queue(null)
}
),
pull.take(1),
pull.drain(function (root) {
deliverRoot.push(root)
deliverRoot.end()
})
)

return {
source: deliverRoot,
sink: sink
}
}

// const specialization = {
// reduce: function (items, callback) {
// callback(null, {
// children: items
// })
// }
// }

// const options = {
// maxChildrenPerNode: 5
// }

// pull(
// pull.values([1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ,11, 12, 13, 14]),
// balancedReduceToRoot(specialization, options),
// pull.drain((root) => {
// console.log('root is: %s', JSON.stringify(root, undefined, ' '))
// })
// )

0 comments on commit 12a3bf5

Please sign in to comment.