Skip to content

Commit

Permalink
refactor: make some functions private
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Mar 18, 2021
1 parent 7e93b64 commit e02254e
Showing 1 changed file with 10 additions and 17 deletions.
27 changes: 10 additions & 17 deletions jina/peapods/zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ def __init__(
self.msg_sent = 0
self.is_closed = False
self.opened_socks = [] # this must be here for `close()`
self.ctx, self.in_sock, self.out_sock, self.ctrl_sock = self.init_sockets()
self.register_pollin()
self.ctx, self.in_sock, self.out_sock, self.ctrl_sock = self._init_sockets()
self._register_pollin()

self.opened_socks.extend([self.in_sock, self.out_sock, self.ctrl_sock])
if self.in_sock_type == zmq.DEALER:
self.send_idle()
self._send_idle()

def register_pollin(self):
def _register_pollin(self):
"""Register :attr:`in_sock`, :attr:`ctrl_sock` and :attr:`out_sock` (if :attr:`out_sock_type` is zmq.ROUTER) in poller."""
self.poller = zmq.Poller()
self.poller.register(self.in_sock, zmq.POLLIN)
Expand Down Expand Up @@ -121,12 +121,12 @@ def _pull(self, interval: int = 1):
elif socks.get(self.in_sock) == zmq.POLLIN:
return self.in_sock

def close_sockets(self):
def _close_sockets(self):
"""Close input, output and control sockets of this `Zmqlet`. """
for k in self.opened_socks:
k.close()

def init_sockets(self) -> Tuple:
def _init_sockets(self) -> Tuple:
"""Initialize all sockets and the ZMQ context.
:return: A tuple of four pieces:
Expand Down Expand Up @@ -217,7 +217,7 @@ def close(self):
"""
if not self.is_closed:
self.is_closed = True
self.close_sockets()
self._close_sockets()
if hasattr(self, 'ctx'):
self.ctx.term()
self.print_stats()
Expand Down Expand Up @@ -255,9 +255,9 @@ def send_message(self, msg: 'Message'):
self.msg_sent += 1

if o_sock == self.out_sock and self.in_sock_type == zmq.DEALER:
self.send_idle()
self._send_idle()

def send_idle(self):
def _send_idle(self):
"""Tell the upstream router this dealer is idle """
msg = ControlMessage('IDLE', pod_name=self.name, identity=self.identity)
self.bytes_sent += send_message(self.in_sock, msg, **self.send_recv_kwargs)
Expand All @@ -280,13 +280,6 @@ def recv_message(
if callback:
return callback(msg)

def clear_stats(self):
"""Reset the internal counter of send and receive bytes to zero. """
self.bytes_recv = 0
self.bytes_sent = 0
self.msg_recv = 0
self.msg_sent = 0


class AsyncZmqlet(Zmqlet):
"""An async vesion of :class:`Zmqlet`.
Expand Down Expand Up @@ -349,7 +342,7 @@ class ZmqStreamlet(Zmqlet):
It requires :mod:`tornado` and :mod:`uvloop` to be installed.
"""

def register_pollin(self):
def _register_pollin(self):
"""Register :attr:`in_sock`, :attr:`ctrl_sock` and :attr:`out_sock` in poller."""
with ImportExtensions(required=True):
import tornado.ioloop
Expand Down

0 comments on commit e02254e

Please sign in to comment.