Skip to content
Permalink
Browse files

Stop using Storage() in the hb drivers

Storage() is not rendered as expected in json.dumps, which is
annoying.
  • Loading branch information...
cvaroqui committed Jul 13, 2019
1 parent 4b800a8 commit 21cc1fdb492b93df3cfdeccdd550926988233276
Showing with 38 additions and 47 deletions.
  1. +3 −8 lib/hb.py
  2. +15 −16 lib/hb_disk.py
  3. +2 −3 lib/hb_mcast.py
  4. +3 −4 lib/hb_relay.py
  5. +15 −16 lib/hb_ucast.py
@@ -54,10 +54,10 @@ def reset_stats(self):

def status(self, **kwargs):
data = shared.OsvcThread.status(self, **kwargs)
data.peers = {}
data["peers"] = {}
for nodename in self.hb_nodes:
if nodename == rcEnv.nodename:
data.peers[nodename] = {}
data["peers"][nodename] = {}
continue
if "*" in self.peers:
_data = self.peers["*"]
@@ -67,7 +67,7 @@ def status(self, **kwargs):
"beating": False,
"success": True,
}))
data.peers[nodename] = {
data["peers"][nodename] = {
"last": _data.last,
"beating": _data.beating,
}
@@ -235,11 +235,6 @@ def store_rx_data(self, data, nodename):
try:
json_delta.patch(shared.CLUSTER_DATA[nodename], deltas[str(gen)])
current_gen = gen
shared.EVENT_Q.put({
"nodename": nodename,
"kind": "patch",
"data": deltas[str(gen)],
})
shared.REMOTE_GEN[nodename] = gen
shared.LOCAL_GEN[nodename] = our_gen_on_peer
shared.CLUSTER_DATA[nodename]["gen"] = {
@@ -13,7 +13,6 @@
import osvcd_shared as shared
import rcExceptions as ex
from rcGlobalEnv import rcEnv
from storage import Storage
from hb import Hb
from rcUtilities import bdecode

@@ -32,13 +31,13 @@ class HbDisk(Hb):

def status(self, **kwargs):
data = Hb.status(self, **kwargs)
data.stats = Storage(self.stats)
data.config = {
data["stats"] = self.stats
data["config"] = {
"dev": self.dev,
"timeout": self.timeout,
}
for peer in data.peers:
data.peers[peer].update(self.peer_config.get(peer, {}))
for peer in data["peers"]:
data["peers"][peer].update(self.peer_config.get(peer, {}))
return data

def configure(self):
@@ -166,9 +165,9 @@ def write_slot(self, slot, data, fo=None):
def load_peer_config(self, fo=None, verbose=True):
for nodename in self.hb_nodes:
if nodename not in self.peer_config:
self.peer_config[nodename] = Storage({
self.peer_config[nodename] = {
"slot": -1,
})
}
for slot in range(self.MAX_SLOTS):
buff = self.meta_read_slot(slot, fo=fo)
if buff is None or buff[0] == "\0":
@@ -179,12 +178,12 @@ def load_peer_config(self, fo=None, verbose=True):
continue
if nodename not in self.peer_config:
continue
if self.peer_config[nodename].slot >= 0 and \
slot != self.peer_config[nodename].slot:
if self.peer_config[nodename]["slot"] >= 0 and \
slot != self.peer_config[nodename]["slot"]:
if verbose:
self.log.warning("duplicate slot %d for node %s (first %d)",
slot, nodename,
self.peer_config[nodename].slot)
self.peer_config[nodename]["slot"])
continue
if verbose:
self.log.info("detect slot %d for node %s", slot, nodename)
@@ -196,7 +195,7 @@ def allocate_slot(self):
buff = self.meta_read_slot(slot, fo=fo)
if buff is None or buff[0] != "\0":
continue
self.peer_config[rcEnv.nodename].slot = slot
self.peer_config[rcEnv.nodename]["slot"] = slot
try:
nodename = bytes(rcEnv.nodename, "utf-8")
except TypeError:
@@ -215,7 +214,7 @@ def __init__(self, name):

def _configure(self):
HbDisk._configure(self)
if self.peer_config[rcEnv.nodename].slot < 0:
if self.peer_config[rcEnv.nodename]["slot"] < 0:
self.allocate_slot()

def run(self):
@@ -246,7 +245,7 @@ def _do(self, fo):
self.reload_config()
if rcEnv.nodename not in self.peer_config:
return
slot = self.peer_config[rcEnv.nodename].slot
slot = self.peer_config[rcEnv.nodename]["slot"]
if slot < 0:
return
message, message_bytes = self.get_message()
@@ -317,10 +316,10 @@ def _do(self, fo):
for nodename, data in self.peer_config.items():
if nodename == rcEnv.nodename:
continue
if data.slot < 0:
if data["slot"] < 0:
continue
try:
slot_data = json.loads(self.read_slot(data.slot, fo=fo))
slot_data = json.loads(self.read_slot(data["slot"], fo=fo))
_nodename, _data = self.decrypt(slot_data["msg"])
if _nodename is None:
# invalid crypt
@@ -348,7 +347,7 @@ def _do(self, fo):
self.push_stats()
if self.get_last(nodename).success:
self.log.error("read from %s slot %d (%s) error: %s", self.dev,
data.slot, nodename, str(exc))
data["slot"], nodename, str(exc))
self.set_last(nodename, success=False)
finally:
self.set_beating(nodename)
@@ -12,7 +12,6 @@
import rcExceptions as ex
import osvcd_shared as shared
from rcGlobalEnv import rcEnv
from storage import Storage
from rcUtilities import chunker, bdecode
from hb import Hb

@@ -34,8 +33,8 @@ class HbMcast(Hb):

def status(self, **kwargs):
data = Hb.status(self, **kwargs)
data.stats = Storage(self.stats)
data.config = {
data["stats"] = self.stats
data["config"] = {
"addr": self.addr,
"port": self.port,
"intf": self.intf,
@@ -9,7 +9,6 @@
import osvcd_shared as shared
import rcExceptions as ex
from rcGlobalEnv import rcEnv
from storage import Storage
from hb import Hb

class HbRelay(Hb):
@@ -19,12 +18,12 @@ class HbRelay(Hb):
"""
def status(self, **kwargs):
data = Hb.status(self, **kwargs)
data.stats = Storage(self.stats)
data.config = {
data["stats"] = self.stats
data["config"] = {
"timeout": self.timeout,
}
if hasattr(self, "relay"):
data.config["relay"] = self.relay
data["config"]["relay"] = self.relay
return data

def configure(self):
@@ -10,7 +10,6 @@
import rcExceptions as ex
import osvcd_shared as shared
from rcGlobalEnv import rcEnv
from storage import Storage
from hb import Hb

class HbUcast(Hb):
@@ -24,10 +23,10 @@ class HbUcast(Hb):

def status(self, **kwargs):
data = Hb.status(self, **kwargs)
data.stats = Storage(self.stats)
data.config = {
"addr": self.peer_config[rcEnv.nodename].addr,
"port": self.peer_config[rcEnv.nodename].port,
data["stats"]= self.stats
data["config"] = {
"addr": self.peer_config[rcEnv.nodename]["addr"],
"port": self.peer_config[rcEnv.nodename]["port"],
"timeout": self.timeout,
}
return data
@@ -58,10 +57,10 @@ def _configure(self):
addr = "0.0.0.0"
else:
addr = nodename
peer_config[nodename] = Storage({
peer_config[nodename] = {
"addr": addr,
"port": port,
})
}

if peer_config != self.peer_config:
self.config_change = True
@@ -118,25 +117,25 @@ def do(self):

def _do(self, message, message_bytes, nodename, config):
try:
#self.log.info("sending to %s:%s", config.addr, config.port)
#self.log.info("sending to %s:%s", config["addr"], config["port"])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.bind((self.peer_config[rcEnv.nodename].addr, 0))
sock.connect((config.addr, config.port))
sock.bind((self.peer_config[rcEnv.nodename]["addr"], 0))
sock.connect((config["addr"], config["port"]))
sock.sendall((message+"\0").encode())
self.set_last(nodename)
self.push_stats(message_bytes)
except socket.timeout as exc:
self.push_stats()
if self.get_last(nodename).success:
self.log.warning("send to %s (%s:%d) timeout", nodename,
config.addr, config.port)
config["addr"], config["port"])
self.set_last(nodename, success=False)
except socket.error as exc:
self.push_stats()
if self.get_last(nodename).success:
self.log.warning("send to %s (%s:%d) error: %s", nodename,
config.addr, config.port, str(exc))
config["addr"], config["port"], str(exc))
self.set_last(nodename, success=False)
finally:
self.set_beating(nodename)
@@ -171,8 +170,8 @@ def _configure(self):
def configure_listener(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.peer_config[rcEnv.nodename].addr,
self.peer_config[rcEnv.nodename].port))
self.sock.bind((self.peer_config[rcEnv.nodename]["addr"],
self.peer_config[rcEnv.nodename]["port"]))
self.sock.listen(5)
self.sock.settimeout(2)

@@ -184,8 +183,8 @@ def run(self):
return

self.log.info("listening on %s:%s",
self.peer_config[rcEnv.nodename].addr,
self.peer_config[rcEnv.nodename].port)
self.peer_config[rcEnv.nodename]["addr"],
self.peer_config[rcEnv.nodename]["port"])

while True:
self.do()

0 comments on commit 21cc1fd

Please sign in to comment.
You can’t perform that action at this time.