Permalink
Browse files

Take 2! So much stupid commenting, but working pretty completely at t…

…his point
  • Loading branch information...
1 parent ab3f7ae commit c930b54fea9d78f73490445e5142c7c0f8affaba @isaacs isaacs committed Nov 5, 2011
Showing with 750 additions and 156 deletions.
  1. +1 −1 examples/pipe.js
  2. +11 −2 examples/reader.js
  3. +28 −3 lib/collect.js
  4. +171 −0 lib/dir-reader.js
  5. +30 −15 lib/dir-writer.js
  6. +96 −0 lib/file-reader.js
  7. +5 −6 lib/file-writer.js
  8. +1 −0 lib/get-type.js
  9. +53 −0 lib/link-reader.js
  10. +1 −2 lib/link-writer.js
  11. +80 −0 lib/proxy-reader.js
  12. +91 −0 lib/proxy-writer.js
  13. +150 −104 lib/reader.js
  14. +32 −23 lib/writer.js
View
@@ -1,6 +1,6 @@
var fstream = require("../fstream.js")
var path = require("path")
-
+debugger
var r = fstream.Reader({ path: path.dirname(__dirname)
, filter: function () {
return !this.basename.match(/^\./) &&
View
@@ -7,12 +7,21 @@ var r = fstream.Reader({ path: path.dirname(__dirname)
}
})
+console.error(r instanceof fstream.Reader)
+console.error(r instanceof require("stream").Stream)
+console.error(r instanceof require("events").EventEmitter)
+console.error(r.on)
+
+r.on("stat", function () {
+ console.error("a %s !!!\t", r.type, r.path)
+})
+
r.on("entries", function (entries) {
- console.error("the entries", entries)
+ console.error("\t" + entries.join("\n\t"))
})
r.on("entry", function (entry) {
- console.error("a %s appears!", entry.type, entry.path)
+ console.error("a %s !!!\t", entry.type, entry.path)
})
r.on("end", function () {
View
@@ -1,6 +1,11 @@
module.exports = collect
function collect (stream) {
+ if (stream._collected) return
+
+ // console.error("collect", stream.path, new Error("trace").stack)
+
+ stream._collected = true
stream.pause()
stream.on("data", save)
@@ -19,30 +24,50 @@ function collect (stream) {
entryBuffer.push(e)
}
+ stream.on("proxy", proxyPause)
+ function proxyPause (p) {
+ p.pause()
+ }
+
+
// replace the pipe method with a new version that will
// unlock the buffered stuff.
stream.pipe = (function (orig) { return function (dest) {
+ if (!dest) {
+ // console.error(" <->-<->- no dest, just unblock")
+ stream.pipe = orig
+ stream.removeListener("entry", saveEntry)
+ stream.removeListener("data", save)
+ stream.removeListener("end", save)
+ stream.resume()
+ return
+ }
+
+ // console.error(" restore pipe", stream.path, "->", dest.path)
// let the entries flow through one at a time.
// Once they're all done, then we can resume completely.
var e = 0
;(function unblockEntry () {
var entry = entryBuffer[e++]
if (!entry) return resume()
+ console.error(" unblockEntry", entry && entry.path)
entry.on("end", unblockEntry)
dest.addEntry(entry)
})()
function resume () {
- stream.removeListener("entry", saveEntry)
- stream.removeListener("data", save)
- stream.removeListener("end", save)
+ console.error(" resume", stream.path, buf)
buf.forEach(function (b) {
if (b) dest.write(b)
else dest.end()
})
+ stream.removeListener("entry", saveEntry)
+ stream.removeListener("data", save)
+ stream.removeListener("end", save)
+
stream.pipe = orig
stream.pipe(dest)
stream.resume()
View
@@ -0,0 +1,171 @@
+// A thing that emits "entry" events with Reader objects
+// Pausing it causes it to stop emitting entry events, and also
+// pauses the current entry if there is one.
+
+module.exports = DirReader
+
+var fs = require("graceful-fs")
+ , fstream = require("../fstream.js")
+ , Reader = fstream.Reader
+ , inherits = require("inherits")
+ , mkdir = require("mkdirp")
+ , path = require("path")
+ , Reader = require("./reader.js")
+
+inherits(DirReader, Reader)
+
+function DirReader (props) {
+ var me = this
+ if (!(me instanceof DirReader)) throw new Error(
+ "DirReader must be called as constructor.")
+
+ // should already be established as a Directory type
+ if (props.type !== "Directory" || !props.Directory) {
+ throw new Error("Non-directory type "+ props.type)
+ }
+
+ me._entries = null
+ me._index = -1
+ me._paused = false
+ me._length = -1
+
+ // me._read = function () {
+ // process.nextTick(DirReader.prototype._read.bind(me))
+ // }
+ Reader.call(this, props)
+}
+
+DirReader.prototype._getEntries = function () {
+ var me = this
+ fs.readdir(me.path, function (er, entries) {
+ if (er) return me.emit("error", er)
+ me._entries = entries
+ me._length = entries.length
+ me._read()
+ })
+}
+
+// start walking the dir, and emit an "entry" event for each one.
+DirReader.prototype._read = function () {
+ var me = this
+
+ if (!me._entries) return me._getEntries()
+
+ console.error("\n\nDIRREADERREAD")
+ if (me._paused || me._currentEntry) {
+ console.error(" (oh, i see you're busy)")
+ return
+ }
+
+ me._index ++
+ console.error("\n\nDirReader %s[%d]=%j",
+ me.path, me._index, me._entries[me._index])
+
+ if (me._index >= me._length) {
+ return me.emit("end")
+ }
+
+ // ok, handle this one, then.
+
+ // save creating a proxy, by stat'ing the thing now.
+ var p = path.resolve(me.path, me._entries[me._index])
+ // set this to prevent trying to _read() again in the stat time.
+ me._currentEntry = p
+ console.error(" currentEntry=%s", p)
+ fs[ me.props.follow ? "stat" : "lstat" ](p, function (er, stat) {
+ if (er) return me.emit("error", er)
+
+ console.error(" statted!", me._currentEntry === p)
+
+ var entry = Reader({ path: p
+ , depth: me.depth + 1
+ , root: me.root || me
+ , parent: me
+ , follow: me.follow
+ , filter: me.filter
+ }, stat)
+
+ me._currentEntry = entry
+
+ console.error(" created child entry", !!entry, entry && entry.path, p)
+
+ // "entry" events are for direct entries in a specific dir.
+ // "child" events are for any and all children at all levels.
+ // This nomenclature is not completely final.
+
+ // entry.on("pause", function () {
+ // if (!me._paused) {
+ // console.error("child pause!")
+ // me.pause()
+ // }
+ // })
+
+ entry.on("resume", function () {
+ if (me._paused) {
+ console.error(" child resume!", p)
+ me.resume()
+ }
+ })
+
+ entry.on("ready", function () {
+ console.error(" ready", p)
+ me.emit("entry", entry)
+ me.emit("child", entry)
+ })
+
+ var ended = false
+ entry.on("end", onend)
+ entry.on("close", onend)
+ function onend () {
+ if (ended) return
+ ended = true
+ console.error(" ending fuck fuck fuck", entry && entry.path, p)
+ me.emit("childEnd", entry)
+ me.emit("entryEnd", entry)
+ me._currentEntry = null
+ me._read()
+ }
+
+ // proxy up some events.
+
+ entry.on("data", function (c) {
+ me.emit("data", c)
+ })
+
+ entry.on("error", function (er) {
+ me.emit("error", er)
+ })
+
+ entry.on("child", function (child) {
+ me.emit("child", child)
+ })
+
+ entry.on("childEnd", function (child) {
+ me.emit("childEnd", child)
+ })
+
+ })
+}
+
+DirReader.prototype.pause = function () {
+ var me = this
+ if (me._paused) return
+ // console.error(" pause", me.path, new Error().stack)
+ me._paused = true
+ if (me._currentEntry && me._currentEntry.pause) {
+ me._currentEntry.pause()
+ }
+ // me.emit("pause")
+}
+
+DirReader.prototype.resume = function () {
+ var me = this
+ // if (!me._paused) return
+ // console.error("\n\nresume", me.path)
+ me._paused = false
+ if (me._currentEntry && me._currentEntry.resume) {
+ // console.error(" resume currentEntry", me._currentEntry.path)
+ me._currentEntry.resume()
+ } else me._read()
+ me.emit("resume")
+}
View
@@ -8,7 +8,7 @@ module.exports = DirWriter
var fs = require("graceful-fs")
, fstream = require("../fstream.js")
- , Writer = fstream.Writer
+ , Writer = require("./writer.js")
, inherits = require("inherits")
, mkdir = require("mkdirp")
, path = require("path")
@@ -23,7 +23,8 @@ function DirWriter (props) {
// should already be established as a Directory type
if (props.type !== "Directory" || !props.Directory) {
- throw new Error("Non-directory type "+ props.type)
+ throw new Error("Non-directory type "+ props.type + " " +
+ JSON.stringify(props))
}
Writer.call(this, props)
@@ -54,6 +55,9 @@ DirWriter.prototype.end = function () {
DirWriter.prototype.add = function (entry) {
var me = this
+
+ console.error("add %s <- %s", me.path, entry.path)
+
collect(entry)
if (!me._ready || me._currentEntry) {
me._buffer.push(entry)
@@ -68,15 +72,19 @@ DirWriter.prototype.add = function (entry) {
me._buffer.push(entry)
me._process()
- return false
+ return 0 === this._buffer.length
}
DirWriter.prototype._process = function () {
var me = this
+
+ console.error("process %j %s", me._processing, me.path)
+
if (me._processing) return
var entry = me._buffer.shift()
if (!entry) {
+ console.error("draining")
me.emit("drain")
if (me._ended) me._finish()
return
@@ -89,36 +97,43 @@ DirWriter.prototype._process = function () {
// don't allow recursive copying
var p = entry
do {
- if (p.path === me.path) {
+ console.error("recursion?\n %s\n %s", me.root.path, p.path)
+ if (p.path === me.root.path || p.path === me.path) {
+ console.error(" yes, recursion")
me._processing = false
+ if (entry._collected) entry.pipe()
return me._process()
}
} while (p = p.parent)
+ console.error("not recursive\n\n")
+
// chop off the entry's root dir, replace with ours
- var opts = { parent: me
- , root: me.root || me
- , type: entry.type
- , depth: me.depth + 1 }
+ var props = { parent: me
+ , root: me.root || me
+ , type: entry.type
+ , depth: me.depth + 1 }
var p = entry.path || entry.props.path
- , root = entry.root || entry.parent
- if (root) {
- p = p.substr(root.path.length + 1)
+ if (entry.parent) {
+ p = p.substr(entry.parent.path.length + 1)
}
- opts.path = path.join(me.path, path.join("/", p))
+ props.path = path.join(me.path, path.join("/", p))
// all the rest of the stuff, copy over from the source.
Object.keys(entry.props).forEach(function (k) {
- if (!opts.hasOwnProperty(k)) {
- opts[k] = entry.props[k]
+ if (!props.hasOwnProperty(k)) {
+ props[k] = entry.props[k]
}
})
// not sure at this point what kind of writer this is.
- var child = me._currentChild = new Writer(opts)
+ var child = me._currentChild = new Writer(props)
+ console.error("\n\n\n\n\npipetarget child", child.path)
child.on("ready", function () {
+ console.error("pipe", entry.path, "->", child.path)
entry.pipe(child)
+ entry.resume()
})
child.on("end", onend)
Oops, something went wrong. Retry.

0 comments on commit c930b54

Please sign in to comment.