Skip to content

Commit

Permalink
Correct readStream behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardobeat committed Nov 22, 2012
1 parent e7889ba commit 2132feb
Showing 1 changed file with 35 additions and 25 deletions.
60 changes: 35 additions & 25 deletions src/indexedup.js
Expand Up @@ -288,30 +288,28 @@ ReadableStream.prototype.init = function(options) {
, options = this._options

req.onsuccess = function(e) {
var result = e.target.result
var cursor = e.target.result
, data = null

if (!result) return
if (!cursor) {
self.end()
return
}

if (options.keys && options.values) {
data = data.value
data = cursor.value
} else if (options.keys && !options.values) {
data = data.key
data = cursor.value.key
} else if (options.values) {
data = data.value
data = cursor.value.value
}

self.emit('data', result)

return result['continue']()
self.emit('data', data)
cursor['continue']()
}

req.onerror = function(err) {
return self.emit('error', err)
}

return transaction.oncomplete = function() {
return self.end()
self.emit('error', err)
}
}

Expand All @@ -323,7 +321,8 @@ function WritableStream(idb, options) {
this.idb = idb
this.writable = true
this.buffer = []
process.nextTick(this.init.bind(this))
this._end = false
this.scheduled = false
this.flushWrites = this.flushWrites.bind(this)
}

Expand All @@ -335,43 +334,54 @@ WritableStream.prototype.write = function(data) {
}
if (this.buffer.length === 0) {
process.nextTick(this.flushWrites)
this.scheduled = true
}

data.type = 'put'
this.buffer.push(data)

return true
}

WritableStream.prototype.flushWrites = function() {
if (!this.writable) return

this.scheduled = false

if (!this.writable) {
return
}

var self = this

if (this.buffer.length === 1) {
var data = this.buffer.shift()
this.idb.put(data.key, data.value, function(err) {
if (err) self.error(err)
if (err) self.emit('error', err)
})
} else if (this.buffer.length > 1) {
var ops = this.buffer.map(function(o){
o.type = 'put'
})
this.idb.batch(this.buffer, function(err) {
if (err) self.error(err)
})
if (err) self.emit('error', err)
})
}
}

WritableStream.prototype.error = function(err) {
this.emit('error', err)
this.end()
if (this._end) {
this.writable = false
this.emit('close')
}
}

WritableStream.prototype.destroy = function() {
this.writable = false
this.end()
}

WritableStream.prototype.end = function(){
this.writable = false
this.emit('close')
this._end = true
if (!this.scheduled) {
process.nextTick(this.flushWrites)
}
}

IUDatabase.prototype.readStream = function() {
Expand Down

0 comments on commit 2132feb

Please sign in to comment.