Permalink
Browse files

Implement Twitter::Streaming::Client

  • Loading branch information...
1 parent 67de147 commit 23afe90aa494229a4389c3e51f753102b34fc551 @sferik committed Aug 11, 2013
View
@@ -7,6 +7,8 @@ gem 'yard'
group :development do
gem 'kramdown'
gem 'pry'
+ gem 'pry-rescue'
+ gem 'pry-stack_explorer'
gem 'pry-debugger', :platforms => :mri_19
end
View
@@ -22,6 +22,7 @@
require 'twitter/settings'
require 'twitter/size'
require 'twitter/source_user'
+require 'twitter/streaming/client'
require 'twitter/suggestion'
require 'twitter/target_user'
require 'twitter/trend'
@@ -0,0 +1,75 @@
+require 'twitter/client'
+require 'twitter/streaming/connection'
+require 'twitter/streaming/proxy'
+require 'twitter/streaming/request'
+require 'twitter/streaming/response'
+
+module Twitter
+ module Streaming
+ class Client < Twitter::Client
+ attr_writer :connection
+
+ def initialize
+ super
+ @connection = Twitter::Streaming::Connection.new
+ @request_options = {
+ :auto_reconnect => true,
+ :content_type => 'application/x-www-form-urlencoded',
+ :headers => {},
+ :oauth => credentials,
+ :port => 443,
+ :ssl => true,
+ :timeout => 0,
+ :user_agent => user_agent,
+ }
+ end
+
+ def user(&block)
+ user!(&block).value
+ end
+
+ def user!(&block)
+ request({
+ :method => 'GET',
+ :host => 'userstream.twitter.com',
+ :path => '/1.1/user.json',
+ :params => {},
+ }) do |data|
+ begin
+ block.call(Tweet.new(data))
+ rescue StandardError => error
+ p(error)
+ end
+ end
+ end
+
+ def track(*keywords, &block)
+ track!(*keywords, &block).value
+ end
+
+ def track!(*keywords, &block)
+ options = {
+ :method => 'POST',
+ :host => 'stream.twitter.com',
+ :path => '/1.1/statuses/filter.json',
+ :params => {'track' => keywords.join(',')},
+ }
+ request(options) do |data|
+ begin
+ block.call(Tweet.new(data))
+ rescue StandardError => error
+ p(error)
+ end
+ end
+ end
+
+ def request(options, &block)
+ # TODO: consider HTTP::Request
+ request = Twitter::Streaming::Request.new(@request_options.merge(options))
+ response = Twitter::Streaming::Response.new(block)
+ @connection.future.stream(request, response)
+ end
+
+ end
+ end
+end
@@ -0,0 +1,29 @@
+require 'celluloid/io'
+require 'http/parser'
+require 'openssl'
+require 'resolv'
+
+module Twitter
+ module Streaming
+ class Connection
+ include Celluloid::IO
+
+ def stream(request, response)
+ client_context = OpenSSL::SSL::SSLContext.new
+ parser = Http::Parser.new(response)
+ client = Celluloid::IO::TCPSocket.new(Resolv.getaddress(request.host), request.port)
+ ssl_client = Celluloid::IO::SSLSocket.new(client, client_context)
+ ssl_client.connect
+ # TODO: HTTP::Request#stream
+ ssl_client.write(request.to_s)
+
+ while body = ssl_client.readpartial(1024)
+ parser << body
+ end
+ rescue EOFError
+ puts "Stream ended"
+ end
+
+ end
+ end
+end
@@ -0,0 +1,25 @@
+module Twitter
+ module Streaming
+ class Proxy
+
+ attr_reader :user, :password, :uri
+
+ def initialize(options = {})
+ @user = options.delete(:user)
+ @password = options.delete(:password)
+ @uri = options.delete(:uri)
+ end
+
+ def header
+ ["#{@user}:#{@password}"].pack('m').delete("\r\n") if credentials?
+ end
+
+ private
+
+ def credentials?
+ @user && @password
+ end
+
+ end
+ end
+end
@@ -0,0 +1,134 @@
+require 'uri'
+require 'simple_oauth'
+
+module Twitter
+ module Streaming
+ class Request
+ attr_reader :proxy, :options
+
+ def initialize(options = {})
+ @options = options
+ @proxy = Proxy.new(@options.delete(:proxy)) if @options[:proxy]
+ end
+
+ def host
+ options[:host]
+ end
+
+ def port
+ options[:port]
+ end
+
+ def to_s
+ content = query
+
+ data = []
+ data << "#{request_method} #{request_uri} HTTP/1.1"
+ data << "Host: #{@options[:host]}"
+
+ if gzip?
+ data << 'Connection: Keep-Alive'
+ data << 'Accept-Encoding: deflate, gzip'
+ else
+ data << 'Accept: */*'
+ end
+
+ data << "User-Agent: #{@options[:user_agent]}" if @options[:user_agent]
+ if put_or_post?
+ data << "Content-Type: #{@options[:content_type]}"
+ data << "Content-Length: #{content.bytesize}"
+ end
+ data << "Authorization: #{oauth_header}" if oauth?
+ data << "Proxy-Authorization: Basic #{proxy.header}" if proxy?
+
+ @options[:headers].each do |name, value|
+ data << "#{name}: #{value}"
+ end
+
+ data << "\r\n"
+ data = data.join("\r\n")
+ data << content if post? || put?
+ data
+ end
+
+ def proxy?
+ @proxy
+ end
+
+ private
+
+ def get?
+ request_method == 'GET'
+ end
+
+ def post?
+ request_method == 'POST'
+ end
+
+ def put?
+ request_method == 'PUT'
+ end
+
+ def put_or_post?
+ put? || post?
+ end
+
+ def gzip?
+ @options[:encoding] && @options[:encoding] == 'gzip'
+ end
+
+ def request_method
+ @options[:method].to_s.upcase
+ end
+
+ def params
+ flat = {}
+ @options[:params].each do |param, val|
+ next if val.to_s.empty? || (val.respond_to?(:empty?) && val.empty?)
+ val = val.join(",") if val.respond_to?(:join)
+ flat[param.to_s] = val.to_s
+ end
+ flat
+ end
+
+ def query
+ params.map do |param, value|
+ [param, SimpleOAuth::Header.encode(value)].join("=")
+ end.sort.join("&")
+ end
+
+ def oauth?
+ @options[:oauth] && !@options[:oauth].empty?
+ end
+
+ def oauth_header
+ SimpleOAuth::Header.new(@options[:method], full_uri, params, @options[:oauth])
+ end
+
+ def proxy_uri
+ "#{uri_base}:#{@options[:port]}#{path}"
+ end
+
+ def request_uri
+ proxy? ? proxy_uri : path
+ end
+
+ def path
+ get? ? "#{@options[:path]}?#{query}" : @options[:path]
+ end
+
+ def uri_base
+ "#{protocol}://#{@options[:host]}"
+ end
+
+ def protocol
+ @options[:ssl] ? 'https' : 'http'
+ end
+
+ def full_uri
+ proxy? ? proxy_uri : "#{uri_base}#{request_uri}"
+ end
+
+ end
+ end
+end
@@ -0,0 +1,25 @@
+require 'buftok'
+
+module Twitter
+ module Streaming
+ class Response
+ def initialize(block)
+ @block = block
+ @tokenizer = BufferedTokenizer.new("\r\n")
+ end
+
+ def on_headers_complete(headers)
+ puts headers
+ # handle response codes
+ end
+
+ def on_body(data)
+ @tokenizer.extract(data).each do |line|
+ next if line.empty?
+ @block.call(JSON.parse(line, :symbolize_names => true))
+ end
+ end
+
+ end
+ end
+end
@@ -0,0 +1,3 @@
+{"created_at":"Wed Apr 06 19:13:37 +0000 2011","id":55709764298092545,"id_str":"55709764298092545","text":"The problem with your code is that it's doing exactly what you told it to do.","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":7505382,"id_str":"7505382","name":"Erik Michaels-Ober","screen_name":"sferik","location":"San Francisco","description":"Write code. Not too much. Mostly Ruby.","url":"https:\/\/github.com\/sferik","entities":{"url":{"urls":[{"url":"https:\/\/github.com\/sferik","expanded_url":null,"indices":[0,25]}]},"description":{"urls":[]}},"protected":false,"followers_count":2479,"friends_count":200,"listed_count":132,"created_at":"Mon Jul 16 12:59:01 +0000 2007","favourites_count":4421,"utc_offset":-28800,"time_zone":"Pacific Time (US & Canada)","geo_enabled":true,"verified":false,"statuses_count":8730,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"000000","profile_background_image_url":"http:\/\/a0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_tile":false,"profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_banner_url":"https:\/\/si0.twimg.com\/profile_banners\/7505382\/1349499693","profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":{"type":"Point","coordinates":[37.78349999,-122.39362884]},"coordinates":{"type":"Point","coordinates":[-122.39362884,37.78349999]},"place":{"id":"5c92ab5379de3839","url":"https:\/\/api.twitter.com\/1.1\/geo\/id\/5c92ab5379de3839.json","place_type":"neighborhood","name":"South Beach","full_name":"South Beach, San Francisco","country_code":"US","country":"United States","bounding_box":{"type":"Polygon","coordinates":[[[-122.403482,37.777529],[-122.387436,37.777529],[-122.387436,37.794486],[-122.403482,37.794486]]]},"attributes":{}},"contributors":null,"retweet_count":316,"entities":{"hashtags":[],"urls":[],"user_mentions":[]},"favorited":false,"retweeted":false}
+{"created_at":"Wed Apr 06 19:13:37 +0000 2011","id":55709764298092545,"id_str":"55709764298092545","text":"The problem with your code is that it's doing exactly what you told it to do.","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":7505382,"id_str":"7505382","name":"Erik Michaels-Ober","screen_name":"sferik","location":"San Francisco","description":"Write code. Not too much. Mostly Ruby.","url":"https:\/\/github.com\/sferik","entities":{"url":{"urls":[{"url":"https:\/\/github.com\/sferik","expanded_url":null,"indices":[0,25]}]},"description":{"urls":[]}},"protected":false,"followers_count":2479,"friends_count":200,"listed_count":132,"created_at":"Mon Jul 16 12:59:01 +0000 2007","favourites_count":4421,"utc_offset":-28800,"time_zone":"Pacific Time (US & Canada)","geo_enabled":true,"verified":false,"statuses_count":8730,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"000000","profile_background_image_url":"http:\/\/a0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_image_url_https":"https:\/\/si0.twimg.com\/profile_background_images\/677717672\/bb0b3653dcf0644e344823e0a2eb3382.png","profile_background_tile":false,"profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/1759857427\/image1326743606_normal.png","profile_banner_url":"https:\/\/si0.twimg.com\/profile_banners\/7505382\/1349499693","profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"default_profile":false,"default_profile_image":false,"following":false,"follow_request_sent":false,"notifications":false},"geo":{"type":"Point","coordinates":[37.78349999,-122.39362884]},"coordinates":{"type":"Point","coordinates":[-122.39362884,37.78349999]},"place":{"id":"5c92ab5379de3839","url":"https:\/\/api.twitter.com\/1.1\/geo\/id\/5c92ab5379de3839.json","place_type":"neighborhood","name":"South Beach","full_name":"South Beach, San Francisco","country_code":"US","country":"United States","bounding_box":{"type":"Polygon","coordinates":[[[-122.403482,37.777529],[-122.387436,37.777529],[-122.387436,37.794486],[-122.403482,37.794486]]]},"attributes":{}},"contributors":null,"retweet_count":316,"entities":{"hashtags":[],"urls":[],"user_mentions":[]},"favorited":false,"retweeted":false}
+
@@ -0,0 +1,31 @@
+require 'helper'
+
+describe Twitter::Streaming::Client do
+ before do
+ @client = Twitter::Streaming::Client.new
+ end
+
+ class FakeConnection
+ include Celluloid::IO
+ def initialize(body)
+ @body = body
+ end
+
+ def stream(request, response)
+ # TODO: assert request is valid
+ @body.each_line do |line|
+ response.on_body(line)
+ end
+ end
+ end
+
+ it "supports tracking keywords" do
+ @client.connection = FakeConnection.new(fixture("track_streaming.json"))
+
+ tweets = []
+ @client.track("india") do |tweet|
+ tweets << tweet
+ end
+ expect(tweets).to have(2).entries
+ end
+end
View
@@ -4,7 +4,11 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'twitter/version'
Gem::Specification.new do |spec|
+ spec.add_dependency 'buftok', '~> 0.1.0'
+ spec.add_dependency 'celluloid', '~> 0.14.0'
+ spec.add_dependency 'celluloid-io', '~> 0.14.0'
spec.add_dependency 'faraday', ['~> 0.8', '< 0.10']
+ spec.add_dependency 'http_parser.rb', '~> 0.5'
spec.add_dependency 'simple_oauth', '~> 0.2'
spec.add_development_dependency 'bundler', '~> 1.0'
spec.authors = ["Erik Michaels-Ober", "John Nunemaker", "Wynn Netherland", "Steve Richert", "Steve Agalloco"]

0 comments on commit 23afe90

Please sign in to comment.