Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
makeusabrew committed Nov 23, 2011
2 parents 1f62e33 + b6d980a commit b192397
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 181 deletions.
11 changes: 0 additions & 11 deletions app/routes.js

This file was deleted.

150 changes: 63 additions & 87 deletions app/workers/stream_processor.js
@@ -1,106 +1,82 @@
var Throughput = require('../throughput');
require('colors');

var push = null;
var pull = null;

var throughput = new Throughput();

var StreamProcessor = function() {
this.start = function() {
var lastChunk = new Date();
var thisChunk = null;

pull.on('message', function(data) {
var rate = throughput.measure(data);
console.log("processing message ("+rate.value+" "+rate.unit+")");
var processed = null;
try {
processed = JSON.parse(data.toString('utf8'));
} catch (e) {
// couldn't parse
console.log("parse error of: "+data.toString('utf8'));
return;
}
if (processed.text == null) {
console.log("discarding message with bad format - assuming delete or rate limit info".bold);
console.log(processed);
return;
this.process = function(data) {
var rate = throughput.measure(data);
console.log("processing message ("+rate.value+" "+rate.unit+")");
var processed = null;
try {
processed = JSON.parse(data.toString('utf8'));
} catch (e) {
// couldn't parse
throw new Error("parse error of: "+data.toString('utf8'));
}
if (processed.text == null) {
throw new Error("discarding message with bad format - assuming delete or rate limit info".bold);
}

var filter = new RegExp("fuck|shit|bollocks|\bdick\b|pussy|cunt|\bporn\b|\bsex\b", "i");

var fullTweet = processed.user.screen_name+": "+processed.text;

if (filter.test(fullTweet)) {
throw new Error("PROFANITY FILTER:".red+" ["+fullTweet+"]");
}

var orderedEntities = [];
var entityTypes = ['urls', 'media', 'hashtags', 'user_mentions'];
var i = entityTypes.length;

// let's make a flat array of entities
while (i--) {
var eType = entityTypes[i];
if (typeof processed.entities[eType] == 'undefined') {
// media is a new entity so isn't always present
continue;
}

var filter = new RegExp("fuck|shit|bollocks|\bdick\b|pussy|cunt|\bporn\b|\bsex\b", "i");

var fullTweet = processed.user.screen_name+": "+processed.text;

if (filter.test(fullTweet)) {
console.log("PROFANITY FILTER:".red+" ["+fullTweet+"]");
return;
}

var orderedEntities = [];
var entityTypes = ['urls', 'media', 'hashtags', 'user_mentions'];
var i = entityTypes.length;

// let's make a flat array of entities
while (i--) {
var eType = entityTypes[i];
if (typeof processed.entities[eType] == 'undefined') {
// media is a new entity so isn't always present
continue;
}

var j = processed.entities[eType].length;
var j = processed.entities[eType].length;

var matchDups = false;
if (eType == 'urls') {
// spam stuff
if (j >= 4) {
console.log("SPAM FILTER:".yellow+" excessive link volume ("+j+"): "+fullTweet);
return;
} else if (j == 3 && processed.user.followers_count < 50 ||
j == 2 && processed.user.followers_count < 10 ||
j == 1 && processed.user.followers_count == 0) {
var matchDups = false;
if (eType == 'urls') {
// spam stuff
if (j >= 4) {
throw new Error("SPAM FILTER:".yellow+" excessive link volume ("+j+"): "+fullTweet);
} else if (j == 3 && processed.user.followers_count < 50 ||
j == 2 && processed.user.followers_count < 10 ||
j == 1 && processed.user.followers_count == 0) {

console.log("SPAM FILTER:".yellow+" excessive link Vs follower count ("+j+" Vs "+processed.user.followers_count+"): "+fullTweet);
return;
throw new Error("SPAM FILTER:".yellow+" excessive link Vs follower count ("+j+" Vs "+processed.user.followers_count+"): "+fullTweet);

} else if (j >= 2) {
matchDups = true;
}
}
while (j--) {
var entity = processed.entities[eType][j];
entity.eType = eType;
orderedEntities.push(entity);
} else if (j >= 2) {
matchDups = true;
}
}
while (j--) {
var entity = processed.entities[eType][j];
entity.eType = eType;
orderedEntities.push(entity);
}
}

// get the entities array in ascending order
orderedEntities.sort(function(a, b) {
return a.indices[0] - b.indices[0];
});

var tweetData = {
"id" : processed.id,
"text" : processed.text,
"entities" : orderedEntities,
"user": {
"followers_count": processed.user.followers_count,
"screen_name": processed.user.screen_name
}
};
push.send(JSON.stringify(tweetData));
// get the entities array in ascending order
orderedEntities.sort(function(a, b) {
return a.indices[0] - b.indices[0];
});
}

this.setPushSocket = function(_socket) {
push = _socket;
console.log("set push socket");
}

this.setPullSocket = function(_socket) {
pull = _socket;
console.log("set pull socket");
var tweetData = {
"id" : processed.id,
"text" : processed.text,
"entities" : orderedEntities,
"user": {
"followers_count": processed.user.followers_count,
"screen_name": processed.user.screen_name
}
};
return JSON.stringify(tweetData);
}
}

Expand Down
14 changes: 10 additions & 4 deletions processor.js
Expand Up @@ -16,7 +16,13 @@ console.log("push: "+pushEndpoint);

push.connect(pushEndpoint);

processor.setPushSocket(push);
processor.setPullSocket(pull);

processor.start();
pull.on('message', function(data) {
try {
push.send(
processor.process(data)
);
} catch (e) {
// something's amiss. Let's log what.
console.log(e.toString());
}
});
15 changes: 1 addition & 14 deletions server.js
Expand Up @@ -2,30 +2,17 @@

var express = require('express'),
app = express.createServer(),
io = require('socket.io').listen(app),
io = require('socket.io').listen(7979),
fs = require('fs'),
zmq = require('zmq');

var Throughput = require('./app/throughput');

app.listen(7979);

app.configure(function() {
var oneYear = 31557600000;
app.use(express.static(__dirname + '/public', {maxAge: oneYear}));
app.set('view engine', 'jade');
app.set('view options', {
'layout': false
});
});

io.configure(function() {
//io.set('transports', ['websocket']);
io.set('log level', 2); // info
});

require('./app/routes')(app);

var queue = zmq.createSocket('pull');

var throughput = new Throughput();
Expand Down
60 changes: 60 additions & 0 deletions tests/server/spec/StreamProcessorSpec.js
@@ -0,0 +1,60 @@
require('colors');
var srcDir = __dirname+'/../../..';

var StreamProcessor = require(srcDir+'/app/workers/stream_processor');

describe('StreamConsumer', function() {
var processor;
beforeEach(function() {
processor = new StreamProcessor();
});

it('should emit an error message when processing invalid JSON', function() {
expect(function() {
processor.process("Invalid JSON")
}).toThrow(
new Error("parse error of: Invalid JSON")
);
});

it('should emit an eror when processing JSON without any text', function() {
expect(function() {
processor.process('{"valid":"json"}');
}).toThrow(
new Error("discarding message with bad format - assuming delete or rate limit info".bold)
);
});

it('should emit an error when encountering a swear word in the tweet text', function() {
expect(function() {
processor.process('{"text":"I don\'t give a fuck", "user":{"screen_name":"Foo"}}');
}).toThrow(
new Error("PROFANITY FILTER:".red+" [Foo: I don't give a fuck]")
);
});

it('should emit an error a user with no followers shares a link', function() {
var tweet = {
"text": "This is a link http://foo.com",
"user": {
"screen_name": "Foo",
"followers_count": 0,
},
"entities": {
"urls": [
{
"url" : "http://foo.com"
}
]
}
};

tweet = JSON.stringify(tweet);

expect(function() {
processor.process(tweet);
}).toThrow(
new Error("SPAM FILTER:".yellow+" excessive link Vs follower count (1 Vs 0): Foo: This is a link http://foo.com")
);
});
});
54 changes: 0 additions & 54 deletions views/index.jade

This file was deleted.

11 changes: 0 additions & 11 deletions views/layout.jade

This file was deleted.

0 comments on commit b192397

Please sign in to comment.