diff --git a/tools/incremental-update/incremental.py b/tools/incremental-update/incremental.py index e5fe5e5..cfccd1d 100644 --- a/tools/incremental-update/incremental.py +++ b/tools/incremental-update/incremental.py @@ -1,21 +1,48 @@ +import sys import redis import time +import hashlib class ZDBIncremental: - def __init__(self, master, slave): - self.master = redis.Redis(unix_socket_path="/tmp/zdb.sock") - self.slave = redis.Redis(port=9900) + def __init__(self, master, mport, slave, sport): + self.minfo = {"host": master, "port": mport} + self.sinfo = {"host": slave, "port": sport} + + self.master = redis.Redis(host=master, port=mport) + self.slave = redis.Redis(host=slave, port=sport) # disable defaults callbacks for target in [self.master, self.slave]: - target.set_response_callback("NSINFO", redis.client.parse_info) - target.set_response_callback("DEL", redis.client.bool_ok) + target.set_response_callback("NSINFO", redis._parsers.helpers.parse_info) + target.set_response_callback("DEL", redis._parsers.helpers.bool_ok) target.set_response_callback("SET", bytes) + target.set_response_callback("AUTH", bytes) + + def authenticate(self, target, password): + print(f"[+] authenticating") + + request = target.execute_command("AUTH", "SECURE", "CHALLENGE") + challenge = request.decode('utf-8') + + encoded = f"{challenge}:{password}" + response = hashlib.sha1(encoded.encode("utf-8")).hexdigest() + + status = target.execute_command("AUTH", "SECURE", response) + + return status == b"OK" def sync(self, master, slave): - raw = self.master.execute_command("DATA", "RAW", slave['dataid'], slave['offset']) + try: + raw = self.master.execute_command("DATA", "RAW", slave['dataid'], slave['offset']) - print(raw) + except redis.exceptions.ResponseError as e: + if str(e) == "EOF": + self.slave.execute_command("NSJUMP") + return None + + raise e + + # print(raw) if raw[3] == 0: # SET @@ -31,6 +58,10 @@ def sync(self, master, slave): def run(self): namespace = "default" + print(f"[+] master host: {self.minfo['host']}, port: {self.minfo['port']}") + print(f"[+] slave host: {self.sinfo['host']}, port: {self.sinfo['port']}") + print(f"[+] syncing namespace: {namespace}") + while True: master = {} slave = {} @@ -38,26 +69,35 @@ def run(self): nsmaster = self.master.execute_command("NSINFO", namespace) master['dataid'] = int(nsmaster['data_current_id']) master['offset'] = int(nsmaster['data_current_offset']) + master['size'] = int(nsmaster['data_size_bytes']) nsslave = self.slave.execute_command("NSINFO", namespace) slave['dataid'] = int(nsslave['data_current_id']) slave['offset'] = int(nsslave['data_current_offset']) - - print("master: %d:%d" % (master['dataid'], master['offset'])) - print("slave: %d:%d" % (slave['dataid'], slave['offset'])) + slave['size'] = int(nsslave['data_size_bytes']) if slave['dataid'] > master['dataid']: raise RuntimeError("slave ahead from master") if master['dataid'] == slave['dataid']: if master['offset'] == slave['offset']: - print("sync, waiting") + sys.stdout.write("\r[+] syncing: %.2f / %.2f MB (%.1f %%), waiting changes \033[K" % (ssize, msize, progress)) time.sleep(10) continue + msize = master['size'] / 1024 / 1024 + ssize = slave['size'] / 1024 / 1024 + progress = (slave['size'] / master['size']) * 100 + dataid = slave['dataid'] + offset = slave['offset'] + + sys.stdout.write("\r[+] syncing: %.2f / %.2f MB (%.1f %%) [request %d:%d] \033[K" % (ssize, msize, progress, dataid, offset)) + sys.stdout.flush() + self.sync(master, slave) if __name__ == '__main__': - incremental = ZDBIncremental("hello", "world") + incremental = ZDBIncremental("hub.grid.tf", 9900, "127.0.0.1", 9900) + # incremental.authenticate(incremental.master, "set-password-here") incremental.run()