Skip to content

Commit

Permalink
Cache every tweet in a capped MongoDB collection; when a message arri…
Browse files Browse the repository at this point in the history
…ves via websocket with bounding box coordinates, query the last 10 tweets within those coordinates and send them down the socket; when the map has settled, the client removes any tweets not within the viewport, and declines to add any that are already in the collection
  • Loading branch information
scottburton11 committed Aug 24, 2011
1 parent 661ddea commit f9fe681
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 12 deletions.
3 changes: 3 additions & 0 deletions Gemfile
Expand Up @@ -3,5 +3,8 @@ source :rubygems
gem 'eventmachine', :git => 'git://github.com/eventmachine/eventmachine.git'
gem 'em-http-request',:git => 'https://github.com/igrigorik/em-http-request.git', :require => 'em-http'
gem 'em-websocket'
gem 'bson', "1.3.0"
gem 'bson_ext', "1.3.0"
gem 'em-mongo'
gem 'simple_oauth'
gem 'json'
13 changes: 11 additions & 2 deletions Gemfile.lock
@@ -1,12 +1,12 @@
GIT
remote: git://github.com/eventmachine/eventmachine.git
revision: 3c8ea2ed3d81190eb204fecdf8e9421c7d3f19c9
revision: 46e3bc256c1733ec67a33ea69c54812900e63d76
specs:
eventmachine (1.0.0.beta.4)

GIT
remote: https://github.com/igrigorik/em-http-request.git
revision: 339e5e54c61a4e482ad8e82e137a51e2fecc568c
revision: 4c00586a9d5e7f6f31df70b39525e595f6f6c86d
specs:
em-http-request (1.0.0.beta.4)
addressable (>= 2.2.3)
Expand All @@ -18,6 +18,12 @@ GEM
remote: http://rubygems.org/
specs:
addressable (2.2.6)
bson (1.3.0)
bson_ext (1.3.0)
em-mongo (0.4.0)
bson (>= 1.1.3)
bson_ext (>= 1.1.3)
eventmachine (>= 0.12.10)
em-socksify (0.1.0)
eventmachine
em-websocket (0.3.1)
Expand All @@ -31,7 +37,10 @@ PLATFORMS
ruby

DEPENDENCIES
bson (= 1.3.0)
bson_ext (= 1.3.0)
em-http-request!
em-mongo
em-websocket
eventmachine!
json
Expand Down
26 changes: 18 additions & 8 deletions javascripts/application.js
@@ -1,5 +1,5 @@
var lastOpened = null;
var WEBSOCKET_HOST = "";
var WEBSOCKET_HOST = "0.0.0.0";

var infoTemplate = "<div class='tweet-bubble'><img src='<%= user.profile_image_url %>' alt='<%= user.username %>' align='left'/><div><strong><%= user.name %></strong><div><%= text %></div></div></div>";
var tweetTemplate = "<div class='img-col'><img src='<%= user.profile_image_url %>'/></div><div class='tweet-col'><strong><%= user.screen_name %></strong>&nbsp;<%= text %></div><div class='clear'></div>"
Expand Down Expand Up @@ -87,9 +87,6 @@ var TweetsView = Backbone.View.extend({
var tweetView = new TweetView({
model: tweet
});
//$(tweetView.el).click(function() {
// tweetView.showInfoWindow();
//});
$("#tweetbar").prepend(tweetView.render().el);
tweetView.placeMarker(map);
}
Expand All @@ -107,14 +104,26 @@ function addTweet(json) {
var tweet = new Tweet({
text: json.text,
latlng: json.coordinates,
user: json.user
user: json.user,
id: json.id
});

tweets.add(tweet);
var included = tweets.detect(function(tweet) { return tweet.id === json.id });
if (!included) {
tweets.add(tweet);
}
};

function cleanUpTweets() {
var remove_tweets = tweets.reject(function(tweet) {
return withinMapView(tweet.get("latlng").coordinates);
});
tweets.remove(remove_tweets);
}

function handleMapMove(){
setWindowBounds();
cleanUpTweets();
window.ws.send(window.map.getBounds());
}

Expand Down Expand Up @@ -146,8 +155,9 @@ $(function(){
ws.onopen = function(){
// Send the map bounds to start getting Tweets
ws.send(map.getBounds());
google.maps.event.addListener(window.map, "dragend", handleMapMove);
google.maps.event.addListener(window.map, "bounds_changed", handleMapMove);
//google.maps.event.addListener(window.map, "dragend", handleMapMove);
//google.maps.event.addListener(window.map, "zoom_changed", handleMapMove);
google.maps.event.addListener(window.map, "idle", handleMapMove);
}

ws.onmessage = function(msg) {
Expand Down
29 changes: 27 additions & 2 deletions twitstream.rb
Expand Up @@ -4,6 +4,7 @@
require 'em-websocket'
require 'em-http-request'
require 'em-http/middleware/oauth'
require 'em-mongo'
require 'json'

#################
Expand All @@ -19,6 +20,12 @@

stream_uri = "http://stream.twitter.com/1/statuses/filter.json"

MONGODB_HOST = "localhost"
MONGODB_DATABASE = "twitstream"

HUNDRED_MB = 104857600
TEN_MB = 10485760

# Four geographic US areas
#
# locs = {
Expand Down Expand Up @@ -76,7 +83,7 @@ def longitude

def within?(bounds)
return false unless bounds.all?
latitude > bounds[0] && longitude > bounds[1] && latitude < bounds[2] && longitude < bounds[3]
latitude > bounds[0] && longitude > bounds[1] && latitude < bounds[2] && longitude < bounds[3] rescue false
end
end

Expand Down Expand Up @@ -106,14 +113,32 @@ def within?(bounds)
end
}

EM::WebSocket.start(:host => "0.0.0.0", :port => "8080") do |ws|
db = EM::Mongo::Connection.new(MONGODB_HOST).db(MONGODB_DATABASE)
coll = db.collection("tweets")
db.command({"convertToCapped" => "tweets", "size" => TEN_MB})
coll.create_index([["coordinates.coordinates", EM::Mongo::GEO2D]])


msid = channel.subscribe do |tweet|
coll.insert(tweet.data)
end

EM::WebSocket.start(:host => "0.0.0.0", :port => "8080") do |ws|
ws.onopen do
sid = nil

ws.onmessage do |msg|
channel.unsubscribe(sid) if sid
bounds = msg.scan(/([\d\-\.]+)/).map{|c| c.first.to_f}
tbounds = [[bounds[1], bounds[0]], [bounds[3], bounds[2]]]
coll.find({"coordinates.coordinates" => {"$within" => {"$box" => tbounds}}}).limit(10).each do |doc|
if doc
tweet = Tweet.new(doc)
ws.send(tweet.to_json) if tweet.coordinates && tweet.point?
end
end


sid = channel.subscribe do |tweet|
EM.next_tick do
ws.send tweet.to_json if tweet.locatable? && tweet.within?(bounds)
Expand Down

0 comments on commit f9fe681

Please sign in to comment.