Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在 Node.js 中用 pipe 处理数组的思考与实现 #34

Open
zwhu opened this issue Mar 18, 2017 · 0 comments
Open

在 Node.js 中用 pipe 处理数组的思考与实现 #34

zwhu opened this issue Mar 18, 2017 · 0 comments
Labels

Comments

@zwhu
Copy link
Owner

zwhu commented Mar 18, 2017

TLDR;
这篇文章的风格是在致敬 Jim 老师;致敬,致敬,懂吗,不是抄袭,程序员的事怎么能叫抄袭。
当然我对 Node.js 的 stream 也是现学现卖,有使用不当的地方,敬请指出。
原文链接 欢迎 star。

写这篇文章的初衷是年前看 SICP 的时候,第二章介绍构造数据抽象的时候有提到 Lisp 对序列的处理采用类似『信号流』的方式。所以很自然的就想到了 Node.js 中的 pipe 方式,于是就一直想用 pipe 的方式尝试一下。

同 Jim 老师的这篇 文章 中描述的一样, 我也是懒癌发作,从年尾拖到今年年初,然后在年初又看到了 Jim 老师 的博客,深受启发,终于下定决心要开始码了...... 然后,嗯,又拖到昨天。促使我下定决心要写的主要原因是昨天部门的年会!反正年会跟我这种死肥宅也没多大关系,在大家 happy 的时候构思了下代码实现,回家用了一晚上的时候补上了代码。

Jim 老师在他的文章里面也说了,JS 的那些数组操作 (map/ reduce/filter) 啥的,每次调用的时候都会进行一次完整的遍历。试想一下如果有一个第一个数是1,长度是 1亿 的递增为 1 的数组,需要把所有的数组都乘 3,再排除其中的奇数,如果用 (map/filter) 的方法,只要也需要循环 一亿五千万次;那么如果有其他办法能只循环一亿次,是不是节省了大量的内存资源和循环消耗的时间。

废话不多说,直接上代码吧。

pipe

在编写代码时,我们应该有一些方法将程序像连接水管一样连接起来 -- 当我们需要获取一些数据时,可以去通过"拧"其他的部分来达到目的。这也应该是IO应有的方式。 -- Doug McIlroy. October 11, 1964

关于 node 的 stream 可以看看这篇 文章

下面是代码部分,整个代码我是在边学 pipe 边用一晚上的时间仓促写就的,懒癌发作,也不想再重构了,各位相公讲究看吧,求别喷代码。

入口

const stream = require('stream')

const last = Symbol()

// 在 selfArray 中接收一个真正的数组
// 返回一个可读流
// 如果再做的精细点,可以做成可读可写流,这样就能通过控制流的大小,来控制内存的大小,别几亿条数据直接撑爆内存了
// 不过对后面 reduce 的处理就比较麻烦
function selfArray(a) {
  const rs = new stream.Readable({
    objectMode: true
  })

  a.forEach((v, index) => {
    rs.push(v)
  })
  rs.push(last)
  rs.push(null)
  return rs
}

上面的 selfArray 在流的最后面 push 了一个 Symbol 对象来标志整个流的输入结束,留待为之后 reduce 的使用。

Map/Filter/Reduce 的实现

function forEach(callback) {
  const ws = new stream.Writable({
    objectMode: true
  })
  let index = 0

  ws._write = function (chunk, enc, next) {
    if (chunk !== last) {
      callback(chunk, index++)
      next()
    }
  }

  return ws
}

function filter(callback) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })

  let index = 0

  trans._transform = function (chunk, enc, next) {
    if (chunk === last) {
      next(null, last)
    } else {
      let condition = callback(chunk, index++)
      if (condition) {
        this.push(chunk)
      }
      next()
    }
  }
  return trans
}

function map(callback) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })
  let index = 0
  trans._transform = function (chunk, enc, next) {
    if (chunk === last) {
      next(null, last)
    } else {
      next(null, callback(chunk, index++))
    }
  }
  return trans
}

function reduce(callback, initial) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })

  let index = 0,
    current = initial,
    prev = initial


  trans._transform = function (chunk, enc, next) {

    if (chunk === last) {
      if (index > 1) {
        prev = callback(prev, current, index - 1)
      }
      this.push(prev)
      this.push(last)
      return next(null, last)
    }

    if (initial === void 0 && index === 0) {
      prev = chunk
    }

    if (index > 0) {
      prev = callback(prev, current, index - 1)
    }

    current = chunk
    index++
    next()
  }

  return trans
}

上面的代码在 reduce 的实现稍微麻烦了一些,reduce 对没有初始值,原始数组为空的条件下有各种不同的处理情况,翻看了下 MDN 的解释又自己实现了下。

使用

selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
  .pipe(map(v => v * 3))
  .pipe(filter(v => v % 2))
  .pipe(reduce((p, c) => p + c, 0))
  .pipe(forEach(v => {
    console.log('pipe 计算最后的结果是:', v)
  }))

为了好看我故意把各种括号都删掉了。嗯,看起来还挺完美,我们来测试下

selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
  .pipe(map(v => {
    console.log('map:', v)
    return v * 3
  }))
  .pipe(filter(v => {
    console.log('filter:', v)
    return v % 2
  }))
  .pipe(reduce((p, c) => {
    console.log('reduce:', p, c)
    return p + c
  }, 0))
  .pipe(forEach(v => {
    console.log('pipe 计算最后的结果是:', v)
  }))
  
  
加上 log 之后可以看到结算结果是:
  
map: 9
filter: 27
map: 2
filter: 6
map: 6
filter: 18
map: 3
filter: 9
reduce: 0 27
map: 5
filter: 15
reduce: 27 9
map: 6
filter: 18
map: 7
filter: 21
reduce: 36 15
map: 1
filter: 3
reduce: 51 21
map: 4
filter: 12
map: 4
filter: 12
reduce: 72 3
pipe 计算最后的结果是: 75

从上面的 log 可以看到, 第一个数 9 先执行了 map,然后在 * 3 之后就直接进入了 filter,此时第 2 个数 2 也开始被 map 处理,然后被 filter 处理,但是由于 * 3 之后是偶数不会被 reduce 接收, reduce 会一直等到第二个奇数,也就是 3 进入之后才会被处理... 嗯,直到最终的计算结果是 75, 被 forEach 消耗。

总结

虽然我没有像 Jim 老师一样进行性能测试,但是猜测也知道 pipe 的方式在数量比较小的时候肯定要弱于正常方式,pipe 的好处在于数据量比较大的时候,可以使用比较小的内存,尽快的处理数组中前置的数据。

@zwhu zwhu added the 思考 label Mar 18, 2017
@zwhu zwhu changed the title 在 Node.js 中用 pipe 处理数组的实现 在 Node.js 中用 pipe 处理数组的思考与实现 Mar 23, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant