-
Notifications
You must be signed in to change notification settings - Fork 32
/
subscriptions.rb
124 lines (95 loc) · 3.37 KB
/
subscriptions.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# frozen_string_literal: true
module LiteCable
class UnknownChannelError < StandardError
attr_reader :channel_id
def initialize(channel_id)
@channel_id = channel_id
super("Unknown channel: #{channel_id}")
end
end
module Connection
# Manage the connection channels and route messages
class Subscriptions
class Error < StandardError; end
class AlreadySubscribedError < Error; end
class UnknownCommandError < Error; end
class ChannelNotFoundError < Error; end
include Logging
def initialize(connection)
@connection = connection
@subscriptions = {}
end
def identifiers
subscriptions.keys
end
def add(identifier, subscribe = true)
raise AlreadySubscribedError if find(identifier)
params = connection.coder.decode(identifier)
channel_id = params.delete("channel")
channel_class = LiteCable.channel_registry.lookup(channel_id)
raise UnknownChannelError, channel_id unless channel_class
subscriptions[identifier] = channel_class.new(connection, identifier, params)
subscribe ? subscribe_channel(subscriptions[identifier]) : subscriptions[identifier]
end
def remove(identifier)
channel = find!(identifier)
subscriptions.delete(identifier)
channel.handle_unsubscribe
log(:debug) { log_fmt("Unsubscribed from channel #{channel.class.id}") }
transmit_subscription_cancel(channel.identifier)
end
def remove_all
subscriptions.keys.each(&method(:remove))
end
def perform_action(identifier, data)
channel = find!(identifier)
channel.handle_action data
end
def execute_command(data)
command = data.delete("command")
case command
when "subscribe" then add(data["identifier"])
when "unsubscribe" then remove(data["identifier"])
when "message" then perform_action(data["identifier"], data["data"])
else
raise UnknownCommandError, "Command not found #{command}"
end
end
def find(identifier)
subscriptions[identifier]
end
def find!(identifier)
channel = find(identifier)
raise ChannelNotFoundError unless channel
channel
end
private
attr_reader :connection, :subscriptions
def subscribe_channel(channel)
channel.handle_subscribe
log(:debug) { log_fmt("Subscribed to channel #{channel.class.id}") }
transmit_subscription_confirmation(channel.identifier)
channel
rescue Channel::RejectedError
subscriptions.delete(channel.identifier)
transmit_subscription_rejection(channel.identifier)
nil
end
def transmit_subscription_confirmation(identifier)
connection.transmit identifier: identifier,
type: LiteCable::INTERNAL[:message_types][:confirmation]
end
def transmit_subscription_rejection(identifier)
connection.transmit identifier: identifier,
type: LiteCable::INTERNAL[:message_types][:rejection]
end
def transmit_subscription_cancel(identifier)
connection.transmit identifier: identifier,
type: LiteCable::INTERNAL[:message_types][:cancel]
end
def log_fmt(msg)
"[connection:#{connection.identifier}] #{msg}"
end
end
end
end