-
Notifications
You must be signed in to change notification settings - Fork 32
/
base.rb
159 lines (133 loc) · 3.93 KB
/
base.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# frozen_string_literal: true
module LiteCable
module Server
module ClientSocket
# Wrapper over web socket
class Base
include Logging
include Subscriptions
attr_reader :version, :active
def initialize(env, socket, version)
log(:debug, "WebSocket version #{version}")
@env = env
@socket = socket
@version = version
@active = true
@open_handlers = []
@message_handlers = []
@close_handlers = []
@error_handlers = []
@close_on_error = true
end
def prevent_close_on_error
@close_on_error = false
end
def transmit(data, type: :text)
frame = WebSocket::Frame::Outgoing::Server.new(
version: version,
data: data,
type: type
)
socket.write frame.to_s
rescue EOFError, Errno::ECONNRESET => e
log(:debug, "Socket gone: #{e}")
close
rescue IOError, Errno::EPIPE, Errno::ETIMEDOUT => e
log(:error, "Socket send failed: #{e}")
close
end
def request
@request ||= Rack::Request.new(@env)
end
def onopen(&block)
@open_handlers << block
end
def onmessage(&block)
@message_handlers << block
end
def onclose(&block)
@close_handlers << block
end
def onerror(&block)
@error_handlers << block
end
def listen
keepalive
Thread.new do
Thread.current.abort_on_exception = true
begin
@open_handlers.each(&:call)
each_frame do |data|
@message_handlers.each do |h|
h.call(data)
rescue => e
log(:error, "Socket receive failed: #{e}")
@error_handlers.each { |eh| eh.call(e, data) }
close if close_on_error
end
end
ensure
close
end
end
end
def close
return unless @active
@close_handlers.each(&:call)
close!
@active = false
end
def closed?
@socket.closed?
end
private
attr_reader :socket, :close_on_error
def close!
if @socket.respond_to?(:closed?)
close_socket unless @socket.closed?
else
close_socket
end
end
def close_socket
frame = WebSocket::Frame::Outgoing::Server.new(version: version, type: :close, code: 1000)
@socket.write(frame.to_s) if frame.supported?
@socket.close
rescue IOError, Errno::EPIPE, Errno::ETIMEDOUT, Errno::ECONNRESET
# already closed
end
def keepalive
thread = Thread.new do
Thread.current.abort_on_exception = true
loop do
sleep 5
transmit nil, type: :ping
end
end
onclose do
thread.kill
end
end
def each_frame
framebuffer = WebSocket::Frame::Incoming::Server.new(version: version)
while socket.wait_readable
data = socket.respond_to?(:recv) ? socket.recv(2000) : socket.readpartial(2000)
break if data.empty?
framebuffer << data
while frame = framebuffer.next # rubocop:disable Lint/AssignmentInCondition
case frame.type
when :close
return
when :text, :binary
yield frame.data
end
end
end
rescue Errno::EHOSTUNREACH, Errno::ETIMEDOUT, Errno::ECONNRESET, IOError, Errno::EBADF => e
log(:debug, "Socket frame error: #{e}")
nil # client disconnected or timed out
end
end
end
end
end