forked from voloko/twitter-stream
/
json_stream.rb
341 lines (288 loc) · 9.09 KB
/
json_stream.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
require 'eventmachine'
require 'em/buftok'
require 'uri'
require 'simple_oauth'
require 'http/parser'
module EventMachine
class JSONStream < EventMachine::Connection
MAX_LINE_LENGTH = 1024*1024
# network failure reconnections
NF_RECONNECT_START = 0.25
NF_RECONNECT_ADD = 0.25
NF_RECONNECT_MAX = 16
# app failure reconnections
AF_RECONNECT_START = 10
AF_RECONNECT_MUL = 2
RECONNECT_MAX = 320
RETRIES_MAX = 10
DEFAULT_OPTIONS = {
:method => 'GET',
:path => '/',
:content_type => "application/x-www-form-urlencoded",
:content => '',
:host => 'localhost',
:port => 80,
:ssl => false,
:user_agent => 'EM-HTTP-Stream',
:timeout => 0,
:proxy => ENV['HTTP_PROXY'],
:auth => nil,
:oauth => {},
:filters => [],
:params => {},
:auto_reconnect => true
}
attr_accessor :code
attr_accessor :headers
attr_accessor :nf_last_reconnect
attr_accessor :af_last_reconnect
attr_accessor :reconnect_retries
attr_accessor :proxy
def self.connect options = {}
options[:port] = 443 if options[:ssl] && !options.has_key?(:port)
options = DEFAULT_OPTIONS.merge(options)
host = options[:host]
port = options[:port]
if options[:proxy]
proxy_uri = URI.parse(options[:proxy])
host = proxy_uri.host
port = proxy_uri.port
end
connection = EventMachine.connect host, port, self, options
connection.start_tls if options[:ssl]
connection
end
def initialize options = {}
@options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly
@gracefully_closed = false
@nf_last_reconnect = nil
@af_last_reconnect = nil
@reconnect_retries = 0
@immediate_reconnect = false
@on_inited_callback = options.delete(:on_inited)
@proxy = URI.parse(options[:proxy]) if options[:proxy]
end
def each_item &block
@each_item_callback = block
end
def on_error &block
@error_callback = block
end
def on_reconnect &block
@reconnect_callback = block
end
def on_max_reconnects &block
@max_reconnects_callback = block
end
def on_close &block
@close_callback = block
end
def stop
@gracefully_closed = true
close_connection
end
def immediate_reconnect
@immediate_reconnect = true
@gracefully_closed = false
close_connection
end
def unbind
if @state == :stream && !@buffer.empty?
parse_stream_line(@buffer.flush)
end
schedule_reconnect if @options[:auto_reconnect] && !@gracefully_closed
@close_callback.call if @close_callback
end
# Receives raw data from the HTTP connection and pushes it into the
# HTTP parser which then drives subsequent callbacks.
def receive_data(data)
@parser << data
end
def connection_completed
send_request
end
def post_init
reset_state
@on_inited_callback.call if @on_inited_callback
end
protected
def schedule_reconnect
timeout = reconnect_timeout
@reconnect_retries += 1
if timeout <= RECONNECT_MAX && @reconnect_retries <= RETRIES_MAX
reconnect_after(timeout)
else
@max_reconnects_callback.call(timeout, @reconnect_retries) if @max_reconnects_callback
end
end
def reconnect_after timeout
@reconnect_callback.call(timeout, @reconnect_retries) if @reconnect_callback
if timeout == 0
reconnect @options[:host], @options[:port]
else
EventMachine.add_timer(timeout) do
reconnect @options[:host], @options[:port]
end
end
end
def reconnect_timeout
if @immediate_reconnect
@immediate_reconnect = false
return 0
end
if (@code == 0) # network failure
if @nf_last_reconnect
@nf_last_reconnect += NF_RECONNECT_ADD
else
@nf_last_reconnect = NF_RECONNECT_START
end
[@nf_last_reconnect,NF_RECONNECT_MAX].min
else
if @af_last_reconnect
@af_last_reconnect *= AF_RECONNECT_MUL
else
@af_last_reconnect = AF_RECONNECT_START
end
@af_last_reconnect
end
end
def reset_state
set_comm_inactivity_timeout @options[:timeout] if @options[:timeout] > 0
@code = 0
@headers = {}
@state = :init
@buffer = BufferedTokenizer.new("\n", MAX_LINE_LENGTH)
@stream = ''
@parser = Http::Parser.new
@parser.on_headers_complete = method(:handle_headers_complete)
@parser.on_body = method(:receive_stream_data)
end
# Called when the status line and all headers have been read from the
# stream.
def handle_headers_complete(headers)
@code = @parser.status_code.to_i
if @code != 200
receive_error("invalid status code: #{@code}.")
end
self.headers = headers
@state = :stream
end
# Called every time a chunk of data is read from the connection once it has
# been opened and after the headers have been processed.
def receive_stream_data(data)
begin
@buffer.extract(data).each do |line|
parse_stream_line(line)
end
@stream = ''
rescue Exception => e
receive_error("#{e.class}: " + [e.message, e.backtrace].flatten.join("\n\t"))
close_connection
return
end
end
def send_request
data = []
request_uri = @options[:path]
if @proxy
# proxies need the request to be for the full url
request_uri = "#{uri_base}:#{@options[:port]}#{request_uri}"
end
content = @options[:content]
unless (q = query).empty?
if @options[:method].to_s.upcase == 'GET'
request_uri << "?#{q}"
else
content = q
end
end
data << "#{@options[:method]} #{request_uri} HTTP/1.1"
data << "Host: #{@options[:host]}"
data << 'Accept: */*'
data << "User-Agent: #{@options[:user_agent]}" if @options[:user_agent]
if @options[:auth]
data << "Authorization: Basic #{[@options[:auth]].pack('m').delete("\r\n")}"
elsif @options[:oauth]
data << "Authorization: #{oauth_header}"
end
if @proxy && @proxy.user
data << "Proxy-Authorization: Basic " + ["#{@proxy.user}:#{@proxy.password}"].pack('m').delete("\r\n")
end
if @options[:method] == 'POST'
data << "Content-type: #{@options[:content_type]}"
data << "Content-length: #{content.length}"
end
if @options[:headers]
@options[:headers].each do |name,value|
data << "#{name}: #{value}"
end
end
data << "\r\n"
send_data data.join("\r\n") << content
end
def receive_error e
@error_callback.call(e) if @error_callback
end
def parse_stream_line ln
ln.strip!
unless ln.empty?
if ln[0,1] == '{' || ln[ln.length-1,1] == '}'
@stream << ln
if @stream[0,1] == '{' && @stream[@stream.length-1,1] == '}'
@each_item_callback.call(@stream) if @each_item_callback
@stream = ''
end
end
end
end
def reset_timeouts
set_comm_inactivity_timeout @options[:timeout] if @options[:timeout] > 0
@nf_last_reconnect = @af_last_reconnect = nil
@reconnect_retries = 0
end
#
# URL and request components
#
# :filters => %w(miama lebron jesus)
# :oauth => {
# :consumer_key => [key],
# :consumer_secret => [token],
# :access_key => [access key],
# :access_secret => [access secret]
# }
def oauth_header
uri = uri_base + @options[:path].to_s
# The hash SimpleOAuth accepts is slightly different from that of
# ROAuth. To preserve backward compatability, fix the cache here
# so that the arguments passed in don't need to change.
oauth = {
:consumer_key => @options[:oauth][:consumer_key],
:consumer_secret => @options[:oauth][:consumer_secret],
:token => @options[:oauth][:access_key],
:token_secret => @options[:oauth][:access_secret]
}
SimpleOAuth::Header.new(@options[:method], uri, params, oauth)
end
# Scheme (https if ssl, http otherwise) and host part of URL
def uri_base
"http#{'s' if @options[:ssl]}://#{@options[:host]}"
end
# Normalized query hash of escaped string keys and escaped string values
# nil values are skipped
def params
flat = {}
@options[:params].merge( :track => @options[:filters] ).each do |param, val|
next if val.to_s.empty? || (val.respond_to?(:empty?) && val.empty?)
val = val.join(",") if val.respond_to?(:join)
flat[param.to_s] = val.to_s
end
flat
end
def query
params.map{|param, value| [escape(param), escape(value)].join("=")}.sort.join("&")
end
def escape str
URI.escape(str.to_s, /[^a-zA-Z0-9\-\.\_\~]/)
end
end
end