-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
Copy pathpubsub.lua
128 lines (108 loc) · 3.24 KB
/
pubsub.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local ws_client = require "resty.websocket.client"
local protoc = require("protoc")
local pb = require("pb")
local _M = {}
local mt = { __index = _M }
local pb_state
local function load_proto()
pb.state(nil)
protoc.reload()
pb.option("int64_as_string")
local pubsub_protoc = protoc.new()
pubsub_protoc:addpath("apisix/include/apisix/model")
local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
if not ok then
ngx.log(ngx.ERR, "failed to load protocol: "..err)
return err
end
pb_state = pb.state(nil)
end
local function init_websocket_client(endpoint)
local ws, err = ws_client:new()
if not ws then
ngx.log(ngx.ERR, "failed to create websocket client: "..err)
return nil, err
end
local ok, err = ws:connect(endpoint)
if not ok then
ngx.log(ngx.ERR, "failed to connect: "..err)
return nil, err
end
return ws
end
function _M.new_ws(server)
local err = load_proto()
if err then
return nil, err
end
local ws, err = init_websocket_client(server)
if not ws then
return nil, err
end
local obj = setmetatable({
type = "ws",
ws_client = ws,
}, mt)
return obj
end
function _M.send_recv_ws_binary(self, data, is_raw)
pb.state(pb_state)
local ws = self.ws_client
if not is_raw then
data = pb.encode("PubSubReq", data)
end
local _, err = ws:send_binary(data)
if err then
return nil, err
end
local raw_data, _, err = ws:recv_frame()
if not raw_data then
ngx.log(ngx.ERR, "failed to receive the frame: ", err)
return nil, err
end
local data, err = pb.decode("PubSubResp", raw_data)
if not data then
ngx.log(ngx.ERR, "failed to decode the frame: ", err)
return nil, err
end
return data
end
function _M.send_recv_ws_text(self, text)
pb.state(pb_state)
local ws = self.ws_client
local _, err = ws:send_text(text)
if err then
return nil, err
end
local raw_data, _, err = ws:recv_frame()
if not raw_data then
ngx.log(ngx.ERR, "failed to receive the frame: ", err)
return nil, err
end
local data, err = pb.decode("PubSubResp", raw_data)
if not data then
ngx.log(ngx.ERR, "failed to decode the frame: ", err)
return nil, err
end
return data
end
function _M.close_ws(self)
self.ws_client:send_close()
end
return _M