/
streams.rb
215 lines (193 loc) · 8.04 KB
/
streams.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# frozen_string_literal: true
# :markup: markdown
module ActionCable
module Channel
# # Action Cable Channel Streams
#
# Streams allow channels to route broadcastings to the subscriber. A
# broadcasting is, as discussed elsewhere, a pubsub queue where any data placed
# into it is automatically sent to the clients that are connected at that time.
# It's purely an online queue, though. If you're not streaming a broadcasting at
# the very moment it sends out an update, you will not get that update, even if
# you connect after it has been sent.
#
# Most commonly, the streamed broadcast is sent straight to the subscriber on
# the client-side. The channel just acts as a connector between the two parties
# (the broadcaster and the channel subscriber). Here's an example of a channel
# that allows subscribers to get all new comments on a given page:
#
# class CommentsChannel < ApplicationCable::Channel
# def follow(data)
# stream_from "comments_for_#{data['recording_id']}"
# end
#
# def unfollow
# stop_all_streams
# end
# end
#
# Based on the above example, the subscribers of this channel will get whatever
# data is put into the, let's say, `comments_for_45` broadcasting as soon as
# it's put there.
#
# An example broadcasting for this channel looks like so:
#
# ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }
#
# If you have a stream that is related to a model, then the broadcasting used
# can be generated from the model and channel. The following example would
# subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE`.
#
# class CommentsChannel < ApplicationCable::Channel
# def subscribed
# post = Post.find(params[:id])
# stream_for post
# end
# end
#
# You can then broadcast to this channel using:
#
# CommentsChannel.broadcast_to(@post, @comment)
#
# If you don't just want to parlay the broadcast unfiltered to the subscriber,
# you can also supply a callback that lets you alter what is sent out. The below
# example shows how you can use this to provide performance introspection in the
# process:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
#
# stream_for @room, coder: ActiveSupport::JSON do |message|
# if message['originated_at'].present?
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
# ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
# logger.info "Message took #{elapsed_time}s to arrive"
# end
#
# transmit message
# end
# end
# end
#
# You can stop streaming from all broadcasts by calling #stop_all_streams.
module Streams
extend ActiveSupport::Concern
included do
on_unsubscribe :stop_all_streams
end
# Start streaming from the named `broadcasting` pubsub queue. Optionally, you
# can pass a `callback` that'll be used instead of the default of just
# transmitting the updates straight to the subscriber. Pass `coder:
# ActiveSupport::JSON` to decode messages as JSON before passing to the
# callback. Defaults to `coder: nil` which does no decoding, passes raw
# messages.
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
# Build a stream handler by wrapping the user-provided callback with a decoder
# or defaulting to a JSON-decoding retransmitter.
handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
streams[broadcasting] = handler
connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do
ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
end
# Start streaming the pubsub queue for the `model` in this channel. Optionally,
# you can pass a `callback` that'll be used instead of the default of just
# transmitting the updates straight to the subscriber.
#
# Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to
# the callback. Defaults to `coder: nil` which does no decoding, passes raw
# messages.
def stream_for(model, callback = nil, coder: nil, &block)
stream_from(broadcasting_for(model), callback || block, coder: coder)
end
# Unsubscribes streams from the named `broadcasting`.
def stop_stream_from(broadcasting)
callback = streams.delete(broadcasting)
if callback
pubsub.unsubscribe(broadcasting, callback)
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end
end
# Unsubscribes streams for the `model`.
def stop_stream_for(model)
stop_stream_from(broadcasting_for(model))
end
# Unsubscribes all streams associated with this channel from the pubsub queue.
def stop_all_streams
streams.each do |broadcasting, callback|
pubsub.unsubscribe broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end
# Calls stream_for with the given `model` if it's present to start streaming,
# otherwise rejects the subscription.
def stream_or_reject_for(model)
if model
stream_for model
else
reject
end
end
private
delegate :pubsub, to: :connection
def streams
@_streams ||= {}
end
# Always wrap the outermost handler to invoke the user handler on the worker
# pool rather than blocking the event loop.
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
handler = stream_handler(broadcasting, user_handler, coder: coder)
-> message do
connection.worker_pool.async_invoke handler, :call, message, connection: connection
end
end
# May be overridden to add instrumentation, logging, specialized error handling,
# or other forms of handler decoration.
#
# TODO: Tests demonstrating this.
def stream_handler(broadcasting, user_handler, coder: nil)
if user_handler
stream_decoder user_handler, coder: coder
else
default_stream_handler broadcasting, coder: coder
end
end
# May be overridden to change the default stream handling behavior which decodes
# JSON and transmits to the client.
#
# TODO: Tests demonstrating this.
#
# TODO: Room for optimization. Update transmit API to be coder-aware so we can
# no-op when pubsub and connection are both JSON-encoded. Then we can skip
# decode+encode if we're just proxying messages.
def default_stream_handler(broadcasting, coder:)
coder ||= ActiveSupport::JSON
stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
end
def stream_decoder(handler = identity_handler, coder:)
if coder
-> message { handler.(coder.decode(message)) }
else
handler
end
end
def stream_transmitter(handler = identity_handler, broadcasting:)
via = "streamed from #{broadcasting}"
-> (message) do
transmit handler.(message), via: via
end
end
def identity_handler
-> message { message }
end
end
end
end