Skip to content

Commit

Permalink
add cluster_helper.py to support syncing from cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
suxb201 committed Sep 23, 2022
1 parent e96ecdc commit ca66203
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 8 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
.idea
bin/*
data
__pycache__
bin
.DS_Store

*.log
*.rdb
*.aof
12 changes: 6 additions & 6 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ for g in "linux" "darwin"; do
done
done

cp sync.toml "$BIN_DIR"
cp restore.toml "$BIN_DIR"

if [ "$1" == "dist" ]; then
echo "[ DIST ]"
cd bin
cp -r ../filters ./
tar -czvf ./redis-shake.tar.gz ./sync.toml ./restore.toml ./redis-shake-* ./filters
cp sync.toml "$BIN_DIR"
cp restore.toml "$BIN_DIR"
cp -r filters "$BIN_DIR"
cp -r scripts/cluster_helper "$BIN_DIR"
cd "$BIN_DIR"
tar -czvf ./redis-shake.tar.gz ./sync.toml ./restore.toml ./redis-shake-* ./filters ./cluster_helper
rm -rf ./filters
cd ..
fi
1 change: 1 addition & 0 deletions cmd/redis-shake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func main() {

// start statistics
if config.Config.Advanced.MetricsPort != 0 {
statistics.Metrics.Address = config.Config.Source.Address
go func() {
log.Infof("metrics url: http://localhost:%d", config.Config.Advanced.MetricsPort)
mux := http.NewServeMux()
Expand Down
3 changes: 3 additions & 0 deletions internal/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
)

type metrics struct {
// info
Address string `json:"address"`

// entries
EntryId uint64 `json:"entry_id"`
AllowEntriesCount uint64 `json:"allow_entries_count"`
Expand Down
170 changes: 170 additions & 0 deletions scripts/cluster_helper/cluster_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python3
# encoding: utf-8
import datetime
import os
import shutil
import signal
import sys
import time
import requests
import toml
from pathlib import Path
import redis
from launcher import Launcher

USAGE = """
cluster_helper is a helper script to start many redis-shake for syncing from cluster.
Usage:
$ python3 cluster_helper.py ./bin/redis-shake sync.toml
"""

REDIS_SHAKE_PATH = ""
SLEEP_SECONDS = 5
stopped = False
toml_template = {}


class Shake:
def __init__(self):
self.metrics_port = 0
self.launcher = None


nodes: dict[str, Shake] = {}


def parse_args():
if len(sys.argv) != 3:
print(USAGE)
exit(1)
global REDIS_SHAKE_PATH, toml_template

# 1. check redis-shake path
REDIS_SHAKE_PATH = sys.argv[1]
if not Path(REDIS_SHAKE_PATH).is_file():
print(f"redis-shake path [{REDIS_SHAKE_PATH}] is not a file")
print(USAGE)
exit(1)
print(f"redis-shake path: {REDIS_SHAKE_PATH}")
REDIS_SHAKE_PATH = os.path.abspath(REDIS_SHAKE_PATH)
print(f"redis-shake abs path: {REDIS_SHAKE_PATH}")

# 2. check and load toml file
toml_template = toml.load(sys.argv[2])
print(toml_template)


def stop():
for shake in nodes.values():
shake.launcher.stop()
exit(0)


def loop():
last_allow_entries_count = {address: 0 for address in nodes.keys()}
last_disallow_entries_count = {address: 0 for address in nodes.keys()}
while True:
if stopped:
stop()
print(f"================ {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ================")

metrics = []
for address, shake in nodes.items():
try:
ret = requests.get(f"http://localhost:{shake.metrics_port}").json()
metrics.append(ret)
except requests.exceptions.RequestException as e:
print(f"get metrics from [{address}] failed: {e}")

for metric in sorted(metrics, key=lambda x: x["address"]):
address = metric['address']
if metric['rdb_file_size'] == 0:
print(f"{metric['address']} shaking")
elif metric['rdb_received_size'] < metric['rdb_file_size']:
print(f"{metric['address']} receiving rdb. "
f"percent=[{metric['rdb_received_size'] / metric['rdb_file_size'] * 100:.2f}]%, "
f"rdbFileSize=[{metric['rdb_file_size'] / 1024 / 1024 / 1024:.3f}]G, "
f"rdbReceivedSize=[{metric['rdb_received_size'] / 1024 / 1024 / 1024:.3f}]G")
elif metric['rdb_send_size'] < metric['rdb_file_size']:
print(f"{metric['address']} syncing rdb. "
f"percent=[{metric['rdb_send_size'] / metric['rdb_file_size'] * 100:.2f}]%, "
f"allowOps=[{(metric['allow_entries_count'] - last_allow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"disallowOps=[{(metric['disallow_entries_count'] - last_disallow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"entryId=[{metric['entry_id']}], "
f"InQueueEntriesCount=[{metric['in_queue_entries_count']}], "
f"unansweredBytesCount=[{metric['unanswered_bytes_count']}]bytes, "
f"rdbFileSize=[{metric['rdb_file_size'] / 1024 / 1024 / 1024:.3f}]G, "
f"rdbSendSize=[{metric['rdb_send_size'] / 1024 / 1024 / 1024:.3f}]G")
else:
print(f"{metric['address']} syncing aof. "
f"allowOps=[{(metric['allow_entries_count'] - last_allow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"disallowOps=[{(metric['disallow_entries_count'] - last_disallow_entries_count[address]) / SLEEP_SECONDS:.2f}], "
f"entryId=[{metric['entry_id']}], "
f"InQueueEntriesCount=[{metric['in_queue_entries_count']}], "
f"unansweredBytesCount=[{metric['unanswered_bytes_count']}]bytes, "
f"diff=[{metric['aof_received_offset'] - metric['aof_applied_offset']}], "
f"aofReceivedOffset=[{metric['aof_received_offset']}], "
f"aofAppliedOffset=[{metric['aof_applied_offset']}]")
last_allow_entries_count[address] = metric['allow_entries_count']
last_disallow_entries_count[address] = metric['disallow_entries_count']

time.sleep(SLEEP_SECONDS)


def main():
parse_args()

# parse args
address = toml_template["source"]["address"]
host, port = address.split(":")
username = toml_template["source"]["username"]
password = toml_template["source"]["password"]
tls = toml_template["source"]["tls"]
print(f"host: {host}, port: {port}, username: {username}, password: {password}, tls: {tls}")
cluster = redis.RedisCluster(host=host, port=port, username=username, password=password, ssl=tls)
print("cluster nodes:", cluster.cluster_nodes())

# parse cluster nodes
for address, node in cluster.cluster_nodes().items():
if "master" in node["flags"]:
nodes[address] = Shake()
print(f"addresses:")
for k in nodes.keys():
print(k)

# create workdir and start redis-shake
shutil.rmtree("data")
os.mkdir("data")
os.chdir("data")
start_port = 11007
for address in nodes.keys():
workdir = address.replace(".", "_").replace(":", "_")

os.mkdir(workdir)
tmp_toml = toml_template
tmp_toml["source"]["address"] = address
start_port += 1
tmp_toml["advanced"]["metrics_port"] = start_port

with open(f"{workdir}/sync.toml", "w") as f:
toml.dump(tmp_toml, f)

# start redis-shake
launcher = Launcher(args=[REDIS_SHAKE_PATH, f"sync.toml"], work_dir=workdir)
nodes[address].launcher = launcher
nodes[address].metrics_port = start_port

signal.signal(signal.SIGINT, signal_handler)
print("start syncing...")
loop()


def signal_handler(sig, frame):
global stopped
print("\nYou pressed Ctrl+C!")
stopped = True


if __name__ == '__main__':
main()
34 changes: 34 additions & 0 deletions scripts/cluster_helper/launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
import signal
import subprocess
from pathlib import Path


class Launcher:
def __init__(self, args, work_dir):
self.started = True
self.args = args
self.work_dir = work_dir
if not os.path.exists(work_dir):
Path(self.work_dir).mkdir(parents=True, exist_ok=True)
self.stdout_file = open(work_dir + "/stdout", 'a')
self.stderr_file = open(work_dir + "/stderr", 'a')
self.process = subprocess.Popen(self.args, stdout=self.stdout_file,
stderr=self.stderr_file, cwd=self.work_dir,
encoding="utf-8")

def __del__(self):
assert not self.started, "Every Launcher should be closed manually! work_dir:" + self.work_dir

def get_pid(self):
return self.process.pid

def stop(self):
if self.started:
self.started = False
print(f"Waiting for process {self.process.pid} to exit...")
self.stdout_file.close()
self.stderr_file.close()
self.process.send_signal(signal.SIGINT)
self.process.wait()
print(f"process {self.process.pid} exited.")
3 changes: 3 additions & 0 deletions scripts/cluster_helper/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
redis==4.3.4
requests==2.27.1
toml==0.10.2

0 comments on commit ca66203

Please sign in to comment.