Skip to content

Commit

Permalink
more reliable twitter stream processing
Browse files Browse the repository at this point in the history
  • Loading branch information
technoweenie committed Jan 27, 2010
1 parent b2ccdb0 commit 255aaf7
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 27 deletions.
81 changes: 55 additions & 26 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,38 +58,67 @@ TwitterNode.prototype.stream = function() {
return this;
}

TwitterNode.prototype.processTweet = function(text) {
if (this.format == 'json') {
this.chunks += text

// quick check for valid JSON
if(!this.chunks.match(this.startJSON) || !this.chunks.match(this.endJSON))
return
// returns parsed JSON object or null if given JSON is invald.
TwitterNode.prototype.processValidJSON = function(text) {
if(!text.match(this.startJSON) || !text.match(this.endJSON)) {
return null;
}

text = this.chunks
try {
return JSON.parse(text)
} catch(err) {
// sometimes what looks like valid json from the regexes isn't:
// {"a": {"b": 1}
// starting and ending brackets, but still invalid.
// hopefully the next chunk will finish the json.
if(this.debug || err.name != 'SyntaxError') {
sys.puts(sys.inspect(text))
sys.puts(err.stack)
}
return null;
}
}

try {
var tweet = JSON.parse(text)
this.chunks = ''
if(tweet.limit) {
this.emit('limit', tweet.limit)
} else if(tweet['delete']) {
this.emit('delete', tweet['delete'])
TwitterNode.prototype.processValidJSONFromPieces = function(text, pieces) {
var tweet;
while(1) {
tweet = this.processValidJSON(text)
if(!tweet) {
if(pieces.length > 0) {
text += pieces.shift();
} else {
this.emit('tweet', tweet)
}
} catch(err) {
// sometimes what looks like valid json from the regexes isn't:
// {"a": {"b": 1}
// starting and ending brackets, but still invalid.
// hopefully the next chunk will finish the json.
if(this.debug || err.name != 'SyntaxError') {
sys.puts(text)
sys.puts(err.stack)
return [tweet, text]
}
} else {
return [tweet, pieces.join("\n")]
}
}

return [tweet, text]
}

TwitterNode.prototype.processTweet = function(incoming) {
if (this.format == 'json') {

var pieces = (this.chunks + incoming).split("\n")
var text = pieces.shift()

var tweet_and_chunk = this.processValidJSONFromPieces(text, pieces)

var tweet = tweet_and_chunk[0]
this.chunks = tweet_and_chunk[1]

if(!tweet) {
return
} else if(tweet.limit) {
this.emit('limit', tweet.limit)
} else if(tweet['delete']) {
this.emit('delete', tweet['delete'])
} else {
this.emit('tweet', tweet)
}
} else {
this.emit('tweet', text)
this.emit('tweet', incoming)
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/twitter_node_config_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe("json TwitterNode instance")
this.twit.processTweet("{")
this.twit.processTweet('"a":{')
this.twit.processTweet('"b":1}')
this.twit.processTweet("}")
this.twit.processTweet("}\n{\"a\":1}")

if(!promise.hasFired && promise._blocking) promise.wait()

Expand Down

0 comments on commit 255aaf7

Please sign in to comment.