This repository has been archived by the owner on Jul 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 205
/
Copy pathhandler.rb
121 lines (94 loc) · 3.02 KB
/
handler.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# Handler class.
# Handles a client connected via a websocket connection.
require 'active_support/core_ext/hash'
require 'securerandom'
require 'signature'
require 'fiber'
require 'rack'
require 'oj'
module Slanger
class Handler
attr_accessor :connection
delegate :error, :send_payload, to: :connection
def initialize(socket, handshake)
@socket = socket
@handshake = handshake
@connection = Connection.new(@socket)
@subscriptions = {}
authenticate
end
# Dispatches message handling to method with same name as
# the event name
def onmessage(msg)
msg = Oj.load(msg)
msg['data'] = Oj.load(msg['data']) if msg['data'].is_a? String
event = msg['event'].gsub(/\Apusher:/, 'pusher_')
if event =~ /\Aclient-/
msg['socket_id'] = connection.socket_id
Channel.send_client_message msg
elsif respond_to? event, true
send event, msg
end
rescue JSON::ParserError
error({ code: 5001, message: "Invalid JSON" })
rescue Exception => e
error({ code: 500, message: "#{e.message}\n #{e.backtrace.join "\n"}" })
end
def onclose
subscriptions = @subscriptions.select { |k,v| k && v }
subscriptions.each_key do |channel_id|
subscription_id = subscriptions[channel_id]
Channel.unsubscribe channel_id, subscription_id
end
end
def authenticate
if !valid_app_key? app_key
error({ code: 4001, message: "Could not find app by key #{app_key}" })
@socket.close_websocket
elsif !valid_protocol_version?
error({ code: 4007, message: "Unsupported protocol version" })
@socket.close_websocket
else
return connection.establish
end
end
def valid_protocol_version?
protocol_version.between?(3, 7)
end
def pusher_ping(msg)
send_payload nil, 'pusher:pong'
end
def pusher_pong msg; end
def pusher_subscribe(msg)
channel_id = msg['data']['channel']
klass = subscription_klass channel_id
if @subscriptions[channel_id]
error({ code: nil, message: "Existing subscription to #{channel_id}" })
else
@subscriptions[channel_id] = klass.new(connection.socket, connection.socket_id, msg).subscribe
end
end
def pusher_unsubscribe(msg)
channel_id = msg['data']['channel']
subscription_id = @subscriptions.delete(channel_id)
Channel.unsubscribe channel_id, subscription_id
end
private
def app_key
@handshake.path.split(/\W/)[2]
end
def protocol_version
@query_string ||= Rack::Utils.parse_nested_query(@handshake.query_string)
@query_string["protocol"].to_i || -1
end
def valid_app_key? app_key
Slanger::Config.app_key == app_key
end
def subscription_klass channel_id
klass = channel_id.match(/\A(private|presence)-/) do |match|
Slanger.const_get "#{match[1]}_subscription".classify
end
klass || Slanger::Subscription
end
end
end