From 5e8d7dcefd49a8ffe9f1c027ed7a4e349ecdb896 Mon Sep 17 00:00:00 2001 From: Maxime Daniel Date: Thu, 21 Dec 2023 04:08:12 +0100 Subject: [PATCH 1/2] tools: incremental: improve incremental update --- tools/incremental-update/incremental.py | 48 ++++++++++++++++++------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/tools/incremental-update/incremental.py b/tools/incremental-update/incremental.py index e5fe5e5..a8fe1a7 100644 --- a/tools/incremental-update/incremental.py +++ b/tools/incremental-update/incremental.py @@ -1,21 +1,33 @@ +import sys import redis import time class ZDBIncremental: - def __init__(self, master, slave): - self.master = redis.Redis(unix_socket_path="/tmp/zdb.sock") - self.slave = redis.Redis(port=9900) + def __init__(self, master, mport, slave, sport): + self.minfo = {"host": master, "port": mport} + self.sinfo = {"host": slave, "port": sport} + + self.master = redis.Redis(host=master, port=mport) + self.slave = redis.Redis(host=slave, port=sport) # disable defaults callbacks for target in [self.master, self.slave]: - target.set_response_callback("NSINFO", redis.client.parse_info) - target.set_response_callback("DEL", redis.client.bool_ok) + target.set_response_callback("NSINFO", redis._parsers.helpers.parse_info) + target.set_response_callback("DEL", redis._parsers.helpers.bool_ok) target.set_response_callback("SET", bytes) def sync(self, master, slave): - raw = self.master.execute_command("DATA", "RAW", slave['dataid'], slave['offset']) + try: + raw = self.master.execute_command("DATA", "RAW", slave['dataid'], slave['offset']) + + except redis.exceptions.ResponseError as e: + if str(e) == "EOF": + self.slave.execute_command("NSJUMP") + return None - print(raw) + raise e + + # print(raw) if raw[3] == 0: # SET @@ -31,6 +43,10 @@ def sync(self, master, slave): def run(self): namespace = "default" + print(f"[+] master host: {self.minfo['host']}, port: {self.minfo['port']}") + print(f"[+] slave host: {self.sinfo['host']}, port: {self.sinfo['port']}") + print(f"[+] syncing namespace: {namespace}") + while True: master = {} slave = {} @@ -38,26 +54,34 @@ def run(self): nsmaster = self.master.execute_command("NSINFO", namespace) master['dataid'] = int(nsmaster['data_current_id']) master['offset'] = int(nsmaster['data_current_offset']) + master['size'] = int(nsmaster['data_size_bytes']) nsslave = self.slave.execute_command("NSINFO", namespace) slave['dataid'] = int(nsslave['data_current_id']) slave['offset'] = int(nsslave['data_current_offset']) - - print("master: %d:%d" % (master['dataid'], master['offset'])) - print("slave: %d:%d" % (slave['dataid'], slave['offset'])) + slave['size'] = int(nsslave['data_size_bytes']) if slave['dataid'] > master['dataid']: raise RuntimeError("slave ahead from master") if master['dataid'] == slave['dataid']: if master['offset'] == slave['offset']: - print("sync, waiting") + sys.stdout.write("\r[+] syncing: %.2f / %.2f MB (%.1f %%), waiting changes \033[K" % (ssize, msize, progress)) time.sleep(10) continue + msize = master['size'] / 1024 / 1024 + ssize = slave['size'] / 1024 / 1024 + progress = (slave['size'] / master['size']) * 100 + dataid = slave['dataid'] + offset = slave['offset'] + + sys.stdout.write("\r[+] syncing: %.2f / %.2f MB (%.1f %%) [request %d:%d] \033[K" % (ssize, msize, progress, dataid, offset)) + sys.stdout.flush() + self.sync(master, slave) if __name__ == '__main__': - incremental = ZDBIncremental("hello", "world") + incremental = ZDBIncremental("127.0.0.1", 9910, "127.0.0.1", 9900) incremental.run() From 87c47765646abbd0cdff39269b7e5d23d58bda6e Mon Sep 17 00:00:00 2001 From: Maxime Daniel Date: Tue, 9 Jan 2024 01:31:13 +0100 Subject: [PATCH 2/2] incremental: add authentication support --- tools/incremental-update/incremental.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tools/incremental-update/incremental.py b/tools/incremental-update/incremental.py index a8fe1a7..cfccd1d 100644 --- a/tools/incremental-update/incremental.py +++ b/tools/incremental-update/incremental.py @@ -1,6 +1,7 @@ import sys import redis import time +import hashlib class ZDBIncremental: def __init__(self, master, mport, slave, sport): @@ -15,6 +16,20 @@ def __init__(self, master, mport, slave, sport): target.set_response_callback("NSINFO", redis._parsers.helpers.parse_info) target.set_response_callback("DEL", redis._parsers.helpers.bool_ok) target.set_response_callback("SET", bytes) + target.set_response_callback("AUTH", bytes) + + def authenticate(self, target, password): + print(f"[+] authenticating") + + request = target.execute_command("AUTH", "SECURE", "CHALLENGE") + challenge = request.decode('utf-8') + + encoded = f"{challenge}:{password}" + response = hashlib.sha1(encoded.encode("utf-8")).hexdigest() + + status = target.execute_command("AUTH", "SECURE", response) + + return status == b"OK" def sync(self, master, slave): try: @@ -83,5 +98,6 @@ def run(self): if __name__ == '__main__': - incremental = ZDBIncremental("127.0.0.1", 9910, "127.0.0.1", 9900) + incremental = ZDBIncremental("hub.grid.tf", 9900, "127.0.0.1", 9900) + # incremental.authenticate(incremental.master, "set-password-here") incremental.run()