/
nobrainer_sse_responder.rb
55 lines (39 loc) · 1.25 KB
/
nobrainer_sse_responder.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
require 'thin/async'
module Responders::NobrainerSseResponder
class DbHandler < RethinkDB::Handler
delegate :write, to: '@writer'
def initialize(writer)
@writer = writer
end
# find all callbacks at https://rethinkdb.com/api/ruby/em_run/
def on_error(err)
Rails.logger.error "SseResponder: #{err.to_s}"
end
def on_initial_val(val)
write(old_val: nil, new_val: val)
end
def on_change(old, new)
write(old_val: old, new_val: new)
end
end
PING_INTERVAL = 25 # so that Heroku don't close the socket
def to_sse
response = Thin::AsyncResponse.new(request.env, 200, 'Content-Type' => 'text/event-stream')
response.send_headers
output_stream = ActionController::Live::SSE.new(response, retry: 300, event: 'row')
db_handler = DbHandler.new(output_stream)
rethink_handle = rethink_query.em_run(NoBrainer.connection.raw, db_handler)
pinger = EventMachine::PeriodicTimer.new(PING_INTERVAL) do
output_stream.write('PING', event: 'ping')
end
response.callback do # aka client disconnected
pinger.cancel
rethink_handle.close
end
head -1 # aka throw :async
end
private
def rethink_query
resource.to_rql.changes(include_initial: true)
end
end