From 12cb5dd855a8771c271742f8011e2230bde57923 Mon Sep 17 00:00:00 2001 From: Erik Michaels-Ober Date: Sun, 11 Aug 2013 11:00:14 +0200 Subject: [PATCH] Implement Twitter::Streaming::Client --- Gemfile | 2 + lib/twitter.rb | 1 + lib/twitter/streaming/client.rb | 75 ++++++++++++++ lib/twitter/streaming/connection.rb | 29 ++++++ lib/twitter/streaming/proxy.rb | 25 +++++ lib/twitter/streaming/request.rb | 134 ++++++++++++++++++++++++++ lib/twitter/streaming/response.rb | 25 +++++ spec/fixtures/track_streaming.json | 3 + spec/twitter/streaming/client_spec.rb | 31 ++++++ twitter.gemspec | 3 + 10 files changed, 328 insertions(+) create mode 100644 lib/twitter/streaming/client.rb create mode 100644 lib/twitter/streaming/connection.rb create mode 100644 lib/twitter/streaming/proxy.rb create mode 100644 lib/twitter/streaming/request.rb create mode 100644 lib/twitter/streaming/response.rb create mode 100644 spec/fixtures/track_streaming.json create mode 100644 spec/twitter/streaming/client_spec.rb diff --git a/Gemfile b/Gemfile index df5677a9d..5de85a054 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,8 @@ gem 'yard' group :development do gem 'kramdown' gem 'pry' + gem 'pry-rescue' + gem 'pry-stack_explorer' gem 'pry-debugger', :platforms => :mri_19 end diff --git a/lib/twitter.rb b/lib/twitter.rb index ed644602b..a08434b14 100644 --- a/lib/twitter.rb +++ b/lib/twitter.rb @@ -22,6 +22,7 @@ require 'twitter/settings' require 'twitter/size' require 'twitter/source_user' +require 'twitter/streaming/client' require 'twitter/suggestion' require 'twitter/target_user' require 'twitter/trend' diff --git a/lib/twitter/streaming/client.rb b/lib/twitter/streaming/client.rb new file mode 100644 index 000000000..c443abce5 --- /dev/null +++ b/lib/twitter/streaming/client.rb @@ -0,0 +1,75 @@ +require 'twitter/client' +require 'twitter/streaming/connection' +require 'twitter/streaming/proxy' +require 'twitter/streaming/request' +require 'twitter/streaming/response' + +module Twitter + module Streaming + class Client < Twitter::Client + attr_writer :connection + + def initialize + super + @connection = Twitter::Streaming::Connection.new + @request_options = { + :auto_reconnect => true, + :content_type => 'application/x-www-form-urlencoded', + :headers => {}, + :oauth => credentials, + :port => 443, + :ssl => true, + :timeout => 0, + :user_agent => user_agent, + } + end + + def user(&block) + user!(&block).value + end + + def user!(&block) + request({ + :method => 'GET', + :host => 'userstream.twitter.com', + :path => '/1.1/user.json', + :params => {}, + }) do |data| + begin + block.call(Tweet.new(data)) + rescue StandardError => error + p(error) + end + end + end + + def track(*keywords, &block) + track!(*keywords, &block).value + end + + def track!(*keywords, &block) + options = { + :method => 'POST', + :host => 'stream.twitter.com', + :path => '/1.1/statuses/filter.json', + :params => {'track' => keywords.join(',')}, + } + request(options) do |data| + begin + block.call(Tweet.new(data)) + rescue StandardError => error + p(error) + end + end + end + + def request(options, &block) + # TODO: consider HTTP::Request + request = Twitter::Streaming::Request.new(@request_options.merge(options)) + response = Twitter::Streaming::Response.new(block) + @connection.future.stream(request, response) + end + + end + end +end diff --git a/lib/twitter/streaming/connection.rb b/lib/twitter/streaming/connection.rb new file mode 100644 index 000000000..5360ccbdb --- /dev/null +++ b/lib/twitter/streaming/connection.rb @@ -0,0 +1,29 @@ +require 'celluloid/io' +require 'http/parser' +require 'openssl' +require 'resolv' + +module Twitter + module Streaming + class Connection + include Celluloid::IO + + def stream(request, response) + client_context = OpenSSL::SSL::SSLContext.new + parser = Http::Parser.new(response) + client = Celluloid::IO::TCPSocket.new(Resolv.getaddress(request.host), request.port) + ssl_client = Celluloid::IO::SSLSocket.new(client, client_context) + ssl_client.connect + # TODO: HTTP::Request#stream + ssl_client.write(request.to_s) + + while body = ssl_client.readpartial(1024) + parser << body + end + rescue EOFError + puts "Stream ended" + end + + end + end +end diff --git a/lib/twitter/streaming/proxy.rb b/lib/twitter/streaming/proxy.rb new file mode 100644 index 000000000..38164e484 --- /dev/null +++ b/lib/twitter/streaming/proxy.rb @@ -0,0 +1,25 @@ +module Twitter + module Streaming + class Proxy + + attr_reader :user, :password, :uri + + def initialize(options = {}) + @user = options.delete(:user) + @password = options.delete(:password) + @uri = options.delete(:uri) + end + + def header + ["#{@user}:#{@password}"].pack('m').delete("\r\n") if credentials? + end + + private + + def credentials? + @user && @password + end + + end + end +end diff --git a/lib/twitter/streaming/request.rb b/lib/twitter/streaming/request.rb new file mode 100644 index 000000000..5536eafb6 --- /dev/null +++ b/lib/twitter/streaming/request.rb @@ -0,0 +1,134 @@ +require 'uri' +require 'simple_oauth' + +module Twitter + module Streaming + class Request + attr_reader :proxy, :options + + def initialize(options = {}) + @options = options + @proxy = Proxy.new(@options.delete(:proxy)) if @options[:proxy] + end + + def host + options[:host] + end + + def port + options[:port] + end + + def to_s + content = query + + data = [] + data << "#{request_method} #{request_uri} HTTP/1.1" + data << "Host: #{@options[:host]}" + + if gzip? + data << 'Connection: Keep-Alive' + data << 'Accept-Encoding: deflate, gzip' + else + data << 'Accept: */*' + end + + data << "User-Agent: #{@options[:user_agent]}" if @options[:user_agent] + if put_or_post? + data << "Content-Type: #{@options[:content_type]}" + data << "Content-Length: #{content.bytesize}" + end + data << "Authorization: #{oauth_header}" if oauth? + data << "Proxy-Authorization: Basic #{proxy.header}" if proxy? + + @options[:headers].each do |name, value| + data << "#{name}: #{value}" + end + + data << "\r\n" + data = data.join("\r\n") + data << content if post? || put? + data + end + + def proxy? + @proxy + end + + private + + def get? + request_method == 'GET' + end + + def post? + request_method == 'POST' + end + + def put? + request_method == 'PUT' + end + + def put_or_post? + put? || post? + end + + def gzip? + @options[:encoding] && @options[:encoding] == 'gzip' + end + + def request_method + @options[:method].to_s.upcase + end + + def params + flat = {} + @options[:params].each do |param, val| + next if val.to_s.empty? || (val.respond_to?(:empty?) && val.empty?) + val = val.join(",") if val.respond_to?(:join) + flat[param.to_s] = val.to_s + end + flat + end + + def query + params.map do |param, value| + [param, SimpleOAuth::Header.encode(value)].join("=") + end.sort.join("&") + end + + def oauth? + @options[:oauth] && !@options[:oauth].empty? + end + + def oauth_header + SimpleOAuth::Header.new(@options[:method], full_uri, params, @options[:oauth]) + end + + def proxy_uri + "#{uri_base}:#{@options[:port]}#{path}" + end + + def request_uri + proxy? ? proxy_uri : path + end + + def path + get? ? "#{@options[:path]}?#{query}" : @options[:path] + end + + def uri_base + "#{protocol}://#{@options[:host]}" + end + + def protocol + @options[:ssl] ? 'https' : 'http' + end + + def full_uri + proxy? ? proxy_uri : "#{uri_base}#{request_uri}" + end + + end + end +end diff --git a/lib/twitter/streaming/response.rb b/lib/twitter/streaming/response.rb new file mode 100644 index 000000000..fa6a163d6 --- /dev/null +++ b/lib/twitter/streaming/response.rb @@ -0,0 +1,25 @@ +require 'buftok' + +module Twitter + module Streaming + class Response + def initialize(block) + @block = block + @tokenizer = BufferedTokenizer.new("\r\n") + end + + def on_headers_complete(headers) + puts headers + # handle response codes + end + + def on_body(data) + @tokenizer.extract(data).each do |line| + next if line.empty? + @block.call(JSON.parse(line, :symbolize_names => true)) + end + end + + end + end +end diff --git a/spec/fixtures/track_streaming.json b/spec/fixtures/track_streaming.json new file mode 100644 index 000000000..3fcfc63ec --- /dev/null +++ b/spec/fixtures/track_streaming.json @@ -0,0 +1,3 @@ +{"created_at":"Wed Apr 06 19:13:37 +0000 2011","id":55709764298092545,"id_str":"55709764298092545","text":"The problem with your code is that it's doing exactly what you told it to do.","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":7505382,"id_str":"7505382","name":"Erik Michaels-Ober","screen_name":"sferik","location":"San Francisco","description":"Write code. Not too much. Mostly Ruby.","url":"https:\/\/github.com\/sferik","entities":{"url":{"urls":[{"url":"https:\/\/github.com\/sferik","expanded_url":null,"indices":[0,25]}]},"description":{"urls":[]}},"protected":false,"followers_count":2479,"friends_count":200,"listed_count":132,"created_at":"Mon Jul 16 12:59:01 +0000 2007","favourites_count":4421,"utc_offset":-28800,"time_zone":"Pacific Time (US & Canada)","geo_enabled":true,"verified":false,"statuses_count":8730,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"000000","profile_background_image_url":"http:\/\/a0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_tile":false,"profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_banner_url":"https:\/\/si0.twimg.com\/profile_banners\/7505382\/1349499693","profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":{"type":"Point","coordinates":[37.78349999,-122.39362884]},"coordinates":{"type":"Point","coordinates":[-122.39362884,37.78349999]},"place":{"id":"5c92ab5379de3839","url":"https:\/\/api.twitter.com\/1.1\/geo\/id\/5c92ab5379de3839.json","place_type":"neighborhood","name":"South Beach","full_name":"South Beach, San Francisco","country_code":"US","country":"United States","bounding_box":{"type":"Polygon","coordinates":[[[-122.403482,37.777529],[-122.387436,37.777529],[-122.387436,37.794486],[-122.403482,37.794486]]]},"attributes":{}},"contributors":null,"retweet_count":316,"entities":{"hashtags":[],"urls":[],"user_mentions":[]},"favorited":false,"retweeted":false} +{"created_at":"Wed Apr 06 19:13:37 +0000 2011","id":55709764298092545,"id_str":"55709764298092545","text":"The problem with your code is that it's doing exactly what you told it to do.","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":7505382,"id_str":"7505382","name":"Erik Michaels-Ober","screen_name":"sferik","location":"San Francisco","description":"Write code. Not too much. Mostly Ruby.","url":"https:\/\/github.com\/sferik","entities":{"url":{"urls":[{"url":"https:\/\/github.com\/sferik","expanded_url":null,"indices":[0,25]}]},"description":{"urls":[]}},"protected":false,"followers_count":2479,"friends_count":200,"listed_count":132,"created_at":"Mon Jul 16 12:59:01 +0000 2007","favourites_count":4421,"utc_offset":-28800,"time_zone":"Pacific Time (US & Canada)","geo_enabled":true,"verified":false,"statuses_count":8730,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"000000","profile_background_image_url":"http:\/\/a0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_tile":false,"profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_banner_url":"https:\/\/si0.twimg.com\/profile_banners\/7505382\/1349499693","profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":{"type":"Point","coordinates":[37.78349999,-122.39362884]},"coordinates":{"type":"Point","coordinates":[-122.39362884,37.78349999]},"place":{"id":"5c92ab5379de3839","url":"https:\/\/api.twitter.com\/1.1\/geo\/id\/5c92ab5379de3839.json","place_type":"neighborhood","name":"South Beach","full_name":"South Beach, San Francisco","country_code":"US","country":"United States","bounding_box":{"type":"Polygon","coordinates":[[[-122.403482,37.777529],[-122.387436,37.777529],[-122.387436,37.794486],[-122.403482,37.794486]]]},"attributes":{}},"contributors":null,"retweet_count":316,"entities":{"hashtags":[],"urls":[],"user_mentions":[]},"favorited":false,"retweeted":false} + diff --git a/spec/twitter/streaming/client_spec.rb b/spec/twitter/streaming/client_spec.rb new file mode 100644 index 000000000..3daebb99e --- /dev/null +++ b/spec/twitter/streaming/client_spec.rb @@ -0,0 +1,31 @@ +require 'helper' + +describe Twitter::Streaming::Client do + before do + @client = Twitter::Streaming::Client.new + end + + class FakeConnection + include Celluloid::IO + def initialize(body) + @body = body + end + + def stream(request, response) + # TODO: assert request is valid + @body.each_line do |line| + response.on_body(line) + end + end + end + + it "supports tracking keywords" do + @client.connection = FakeConnection.new(fixture("track_streaming.json")) + + tweets = [] + @client.track("india") do |tweet| + tweets << tweet + end + expect(tweets).to have(2).entries + end +end diff --git a/twitter.gemspec b/twitter.gemspec index 28b25a84c..67a3ae7c2 100644 --- a/twitter.gemspec +++ b/twitter.gemspec @@ -4,8 +4,11 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require 'twitter/version' Gem::Specification.new do |spec| + spec.add_dependency 'buftok', '~> 0.1.0' spec.add_dependency 'celluloid', '~> 0.14.0' + spec.add_dependency 'celluloid-io', '~> 0.14.0' spec.add_dependency 'faraday', ['~> 0.8', '< 0.10'] + spec.add_dependency 'http_parser.rb', '~> 0.5' spec.add_dependency 'simple_oauth', '~> 0.2' spec.add_development_dependency 'bundler', '~> 1.0' spec.authors = ["Erik Michaels-Ober", "John Nunemaker", "Wynn Netherland", "Steve Richert", "Steve Agalloco"]