A prototype app to showcase EventMachine, MongoDB, and Twitter streaming API.
After checkout the Rails 3 app. Edit the Twitter configuration file with your Twitter App and OAuth token
# modify config/twitter.yml
After bundle install, run the following to initialize the MongoDB database
bundle install
bundle exec rake db:mongo:init
Run the follwing rake task to connect to the Twitter Streaming API. Ctrl-C or kill -TERM to stop the rake task.
bundle exec rake twitter:stream
The task will download the latest geo-enabled status and persist them in the MongoDB database.
Recommend to use zeus to run Rails related command line.
After you run the 'twitter:stream' rake task for a while, your local tweets data will be ready. Now you can start to do search based on the geo coordinates you input. Start the Rails server, and then try the following url
http://RAILS_SERVER/
Upon successful search, the page will return a list of tweets that are nearby the geo location of your input. The tweets are ordered in the order of how close they are to your search coordinates: starting from 1km radius to 2km, 5km, and then 10km in that order. The search won't return result out of the 10km radius. The radius tier can be configured in search_criteria.yml. Within each radius, the result is ordered reverse chronologically.
If you want to uninstall database
bundle exec rake db:mongo:drop
Due to the reason that Mongoid doesn't support capped collection creation anymore, make sure you run the rake task to create the schema before test
RAILS_ENV=test bundle exec rake db:mongo:drop db:mongo:init
The app uses TweetStream gem, which is an EM implementation. The app is only tested with Ruby 1.9.3, MongoDB 2.4.4.
The idea is to get tweets fast and miss less tweets from the Twitter streaming data. The Twitter streaming server is the data producer, our client is the data consumer.
The streaming API states that 'each account may create only one standing connection to the public endpoints'. The client doesn't have much choice but sticks to the per connection status inflow. The physical network may increase the amount of data coming in. But how fast the client gets the data is mostly capped by the server.
Now the question becomes that whether or not the consumer/client can be fast enough consumes the data sent by the server. Chasing the call sequence, you can find out that the TweetStream client uses the client provided by the em-twitter, in which case it's a EM::Connection client connects to the server. The client is managed by EM and runs in the main EM event loop. Whenever the system tells it that there's data arriving on the port, it polls the system I/O buffer by invoking the receive_data callback. It then buffers the data internally. Each chunk of the incoming data may not be complete, but the client decodes them into usable packets. The TweetStream client then gets the data by calling the EM::Twitter::Client#each iterator. This eventaully invokes the status handling callback we defined when we initialized the client as a block.
So the status handling callback is executed inside the main EM event loop. It could block the event loop if it's too expensive. In the 'twitter_streaming.rake' file, we include 3 different approaches to this. Case 1 (twitter:stream task) uses fiber, Case 2 (twitter:stream2 task) is the simplest by doing database insertion inside the callback. Case 3 (twitter:stream3 task) uses EM.defer to put the database insertion in the background as an asynchronous task. Case 3 is my personal favorite b/c it increases the capability of the data consumer. If the server data inflow speeds up, this can handle it well. While the other 2 cases may eventaully loose data as it all depends on how big the incoming data the system I/O buffer can hold. When you run the twitter:stream3 rake task, literally you can find out that there are 20 more connections to the MongoDB, this indirectly proves the thread-pool size of EM.defer is 20. The intersting part is that the test result shows each of these 3 approaches persists the tweet at a speed of roughly 1000 status per 23 or 24 seconds. This is likely because the consumer in each case all is fast enough to consume the server data.
Ruby Fiber can be created every time the main TweetStream client callback block is invoked. But it imposes overhead of Fiber creation even though it's light weighted.
There's other approach to solve the same problem -- to abandon the TweetStream gem (and thus EM::Connection#receive_data) and use the em-http-request directly. This gives you more control over the EM event loop. But then you face the same trick like this. But it could also run into problem by making connection to Twitter too often.
The geo data uses Mongo '2d' geo-spacial index, not the latest '2dsphere' index.
Given the Twitter streaming API never sends the same status a second time, there is no unique index on the tweet/status id enforced in the database.
Through testing, the geo coordinates [0, 0] seems to data error from Twitter. It should be handled more appropriately.
Copyright (c) 2013 stonelonely and contributors, released under the MIT license.