Skip to content

Commit

Permalink
Fix up basic ping between master and minion
Browse files Browse the repository at this point in the history
  • Loading branch information
dwoz committed Aug 9, 2023
1 parent f53a2c4 commit fc41af1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 59 deletions.
16 changes: 6 additions & 10 deletions salt/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def publish_server(opts, **kwargs):
import salt.transport.local

return salt.transport.local.LocalPubServerChannel(opts, **kwargs)
raise Exception("Transport type not found: {}".format(ttype))
raise Exception(f"Transport type not found: {ttype}")


def publish_client(opts, io_loop, host=None, port=None, path=None, transport=None):
Expand All @@ -130,14 +130,14 @@ def publish_client(opts, io_loop, host=None, port=None, path=None, transport=Non
return salt.transport.tcp.TCPPubClient(
opts, io_loop, host=host, port=port, path=path
)
elif ttype == "tcp":
elif ttype == "ws":
import salt.transport.ws

return salt.transport.ws.PublishClient(
opts, io_loop, host=host, port=port, path=path
)

raise Exception("Transport type not found: {}".format(ttype))
raise Exception(f"Transport type not found: {ttype}")


def _minion_hash(hash_type, minion_id):
Expand Down Expand Up @@ -173,9 +173,7 @@ def ipc_publish_client(node, opts, io_loop):
minion_id=opts.get("hash_id", opts["id"]),
)
kwargs.update(
path=os.path.join(
opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
path=os.path.join(opts["sock_dir"], f"minion_event_{id_hash}_pub.ipc")
)
return publish_client(opts, io_loop, **kwargs)

Expand Down Expand Up @@ -209,13 +207,11 @@ def ipc_publish_server(node, opts):
hash_type=opts["hash_type"],
minion_id=opts.get("hash_id", opts["id"]),
)
pub_path = os.path.join(
opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
)
pub_path = os.path.join(opts["sock_dir"], f"minion_event_{id_hash}_pub.ipc")
kwargs.update(
pub_path=pub_path,
pull_path=os.path.join(
opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
opts["sock_dir"], f"minion_event_{id_hash}_pull.ipc"
),
)
return publish_server(opts, **kwargs)
Expand Down
8 changes: 3 additions & 5 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import tornado.netutil
import tornado.tcpclient
import tornado.tcpserver
import tornado.util

import salt.master
import salt.payload
Expand All @@ -33,6 +34,7 @@
import salt.utils.files
import salt.utils.msgpack
import salt.utils.platform
import salt.utils.process
import salt.utils.versions
from salt.exceptions import SaltClientError, SaltReqTimeoutError
from salt.utils.network import ip_bracket
Expand All @@ -42,10 +44,6 @@
else:
USE_LOAD_BALANCER = False

if USE_LOAD_BALANCER:
import tornado.util

from salt.utils.process import SignalHandlingProcess

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -134,7 +132,7 @@ def _set_tcp_keepalive(sock, opts):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 0)


class LoadBalancerServer(SignalHandlingProcess):
class LoadBalancerServer(salt.utils.process.SignalHandlingProcess):
"""
Raw TCP server which runs in its own process and will listen
for incoming connections. Each incoming connection will be
Expand Down
74 changes: 30 additions & 44 deletions salt/transport/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def recv(self, timeout=None):
try:
await self._read_in_progress.acquire(timeout=0.001)
except tornado.gen.TimeoutError:
log.error("Timeout Error")
log.error("Unable to acquire read lock")
return
try:
if timeout == 0:
Expand Down Expand Up @@ -213,10 +213,8 @@ async def recv(self, timeout=None):
if raw_msg.data == "close":
await self._ws.close()
if raw_msg.type == aiohttp.WSMsgType.BINARY:
log.error("ORIG MSG IS %r", raw_msg.data)
self.unpacker.feed(raw_msg.data)
for msg in self.unpacker:
log.error("MSG IS %r", msg)
framed_msg = salt.transport.frame.decode_embedded_strs(msg)
return framed_msg["body"]
elif raw_msg.type == aiohttp.WSMsgType.ERROR:
Expand All @@ -231,14 +229,8 @@ async def handle_on_recv(self, callback):
while not self._ws:
await asyncio.sleep(0.003)
while True:
try:
msg = await self.recv()
except Exception: # pylint: disable=broad-except
# XXX
log.error("Other exception", exc_info=True)
else:
log.error("on recv got msg %r", msg)
callback(msg)
msg = await self.recv()
callback(msg)

def on_recv(self, callback):
"""
Expand Down Expand Up @@ -376,9 +368,7 @@ def pre_fork(self, process_manager):

async def handle_request(self, request):
ws = aiohttp.web.WebSocketResponse()
log.error("perpare request")
await ws.prepare(request)
log.error("request prepared")
self.clients.add(ws)
while True:
await asyncio.sleep(1)
Expand All @@ -387,7 +377,11 @@ def connect(self):
log.debug("Connect pusher %s", self.pull_path)
self.pub_sock = salt.utils.asynchronous.SyncWrapper(
_TCPPubServerPublisher,
(self.pull_path,),
(
self.pull_host,
self.pull_port,
self.pull_path,
),
loop_kwarg="io_loop",
)
self.pub_sock.connect()
Expand All @@ -404,7 +398,6 @@ async def publish_payload(self, package, *args):
payload = salt.transport.frame.frame_msg(package)
for ws in list(self.clients):
try:
log.error("Publish package %r %r", ws, payload)
await ws.send_bytes(payload)
except ConnectionResetError:
self.clients.discard(ws)
Expand Down Expand Up @@ -467,25 +460,18 @@ async def server():
io_loop.spawn_callback(server)

async def handle_message(self, request):
try:
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
log.error("got msg %r", msg)
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
if msg.type == aiohttp.WSMsgType.BINARY:
payload = salt.payload.loads(msg.data)
log.error("Handle message got %r", payload)
reply = await self.message_handler(payload)
log.error("Handle message reply %r", reply)
await ws.send_bytes(salt.payload.dumps(reply))
elif msg.type == aiohttp.WSMsgType.ERROR:
log.error("ws connection closed with exception %s", ws.exception())
except Exception: # pylint: disable=broad-except
# XXX
log.error("Message handler", exc_info=True)
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
if msg.type == aiohttp.WSMsgType.BINARY:
payload = salt.payload.loads(msg.data)
reply = await self.message_handler(payload)
await ws.send_bytes(salt.payload.dumps(reply))
elif msg.type == aiohttp.WSMsgType.ERROR:
log.error("ws connection closed with exception %s", ws.exception())

def close(self):
self._run.clear()
Expand All @@ -505,21 +491,20 @@ def __init__(self, opts, io_loop): # pylint: disable=W0231
self._closed = False

async def connect(self):
if self.session is None:
self.session = aiohttp.ClientSession()
URL = self.get_master_uri(self.opts)
self.ws = await self.session.ws_connect(URL)
# if self.session is None:
self.session = aiohttp.ClientSession()
URL = self.get_master_uri(self.opts)
self.ws = await self.session.ws_connect(URL)

async def send(self, load, timeout=60):
if self.sending:
if self.sending or self._closing:
await asyncio.sleep(0.03)
self.sending = True
try:
await self.connect()
message = salt.payload.dumps(load)
await self.ws.send_bytes(message)
async for msg in self.ws:
log.error("Got MSG %r", msg)
if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
data = salt.payload.loads(msg.data)
Expand All @@ -531,27 +516,28 @@ async def send(self, load, timeout=60):
async def _close(self):
if self.ws is not None:
await self.ws.close()
self.ws = None
if self.session is not None:
await self.session.close()
self.session = None
self._closed = True

def close(self):
if self._closing:
return
self._closing = True
self.io_loop.spawn_callback(self._close)
self.close_task = asyncio.create_task(self._close())

@staticmethod
def get_master_uri(opts):
if "master_uri" in opts:
return opts["master_uri"]
return opts["master_uri"].replace("tcp:", "http:", 1)
return f"http://{opts['master_ip']}:{opts['master_port']}/ws"

# pylint: disable=W1701
def __del__(self):
if not self._closing:
warnings.warn(
"unclosed publish client {self!r}", ResourceWarning, source=self
"Unclosed publish client {self!r}", ResourceWarning, source=self
)

# pylint: enable=W1701

0 comments on commit fc41af1

Please sign in to comment.