diff --git a/.editorconfig b/.editorconfig index 3404fe3..c893398 100644 --- a/.editorconfig +++ b/.editorconfig @@ -16,3 +16,6 @@ trim_trailing_whitespace = true max_line_length = off trim_trailing_whitespace = false +[{*.js,*.j2}] +indent_style = space +indent_size = 2 \ No newline at end of file diff --git a/.github/workflows/import.yaml b/.github/workflows/import.yaml index 3ad0110..96915fd 100644 --- a/.github/workflows/import.yaml +++ b/.github/workflows/import.yaml @@ -70,5 +70,8 @@ jobs: - name: import the data run: | - python3 run.py nebula importer - + python3 run.py nebula importer + + - name: run stress testing + run: | + python3 run.py stress run -d 3 diff --git a/.gitignore b/.gitignore index d29076a..9956232 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ mysql/data .pytest_cache .env nebula-bench.db -.vscode \ No newline at end of file +.vscode +output \ No newline at end of file diff --git a/README.md b/README.md index b630755..12f667f 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,39 @@ python3 run.py nebula importer --dry-run ### nebula benchmark -Work in progress. +Use [k6](https://github.com/k6io/k6) with [xk6-nebula](https://github.com/HarrisChu/xk6-nebula) extension. +Be careful, the default `k6` in scripts folder is built in Linux, if you want to +run the tool in Mac OS, please download by youself. [xk6-nebula](https://github.com/HarrisChu/xk6-nebula/tags) + +Scenarios are in `nebula_bench/scenarios/`. + +```bash +# show help +python3 run.py stress run --help + +# run all scenarios with 100 virtual users, every scenario lasts 60 seconds. +python3 run.py stress run + +# run all scenarios with 10 virtual users, every scenario lasts 3 seconds. +python3 run.py stress run -vu 10 -d 3 + +# run go.Go1Step scenarios with 10 virtual users, every scenario lasts 3 seconds. +python3 run.py stress run -vu 10 -d 3 -s go.Go1Step +``` + +k6 config file, summary result and outputs are in `output` folder. e.g. + +```bash +# you should install jq to parse json. +# how many checks +jq .metrics.checks output/result_Go1Step.json + +# summary latency +jq .metrics.latency output/result_Go1Step.json + +# summary error message +awk -F ',' 'NR>1{print $NF}' output/output_Go1Step.csv |sort|uniq -c +``` ## and more diff --git a/README_cn.md b/README_cn.md index b9f78b4..4f9b60f 100644 --- a/README_cn.md +++ b/README_cn.md @@ -88,7 +88,40 @@ python3 run.py nebula importer --dry-run ### nebula benchmark -进行中,当前可以通过手动调整 Jmeter 来测试,具体参考 [jmx](ldbc/jmx/go_step.jmx) 和 [java](util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java)。 +使用带有 [xk6-nebula](https://github.com/HarrisChu/xk6-nebula) 插件的 [K6](https://github.com/k6io/k6) 来进行压测。 +需要注意,默认的 `k6` 是 linux 下编译的,如果需要在 Mac OS 上使用,请自行下载对应的二进制文件。[xk6-nebula](https://github.com/HarrisChu/xk6-nebula/tags) + +自动化的场景,在 `nebula_bench/scenarios/` 中。 + +```bash +# show help +python3 run.py stress run --help + +# run all scenarios with 100 virtual users, every scenario lasts 60 seconds. +python3 run.py stress run + +# run all scenarios with 10 virtual users, every scenario lasts 3 seconds. +python3 run.py stress run -vu 10 -d 3 + +# run go.Go1Step scenarios with 10 virtual users, every scenario lasts 3 seconds. +python3 run.py stress run -vu 10 -d 3 -s go.Go1Step +``` + +k6 config file, summary result and outputs are in `output` folder. e.g. + +```bash +# you should install jq to parse json. +# how many checks +jq .metrics.checks output/result_Go1Step.json + +# summary latency +jq .metrics.latency output/result_Go1Step.json + +# summary error message +awk -F ',' 'NR>1{print $NF}' output/output_Go1Step.csv |sort|uniq -c +``` + +如果使用 Jmeter,暂时没有自动化操作,可以通过手动调整 Jmeter 来测试,具体参考 [jmx](ldbc/jmx/go_step.jmx) 和 [java](util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java)。 ## 更多 diff --git a/nebula_bench/cli.py b/nebula_bench/cli.py index 08d4e0f..4c95c02 100644 --- a/nebula_bench/cli.py +++ b/nebula_bench/cli.py @@ -5,6 +5,8 @@ from nebula_bench.utils import logger from nebula_bench.controller import NebulaController from nebula_bench.utils import run_process +from nebula_bench.stress import StressFactory + SH_COMMAND = "/bin/bash" @@ -102,21 +104,62 @@ def importer(folder, address, user, password, space, vid_type, dry_run): nc.release() -# @nebula.command(help="initial nebula graph, including create indexes") -# @common -# def init(folder, address, user, password, space): -# nc = NebulaController( -# data_folder=folder, -# user=user, -# password=password, -# address=address, -# space=space, -# vid_type="int", -# ) +@nebula.command(help="initial nebula graph, including create indexes") +@common +def init(folder, address, user, password, space): + nc = NebulaController( + data_folder=folder, + user=user, + password=password, + address=address, + space=space, + vid_type="int", + ) + + nc.init_space() + -# nc.init_space() +@cli.group() +def stress(): + pass -# @cli.group() -# def stress(): -# pass +@stress.command() +@common +@click.option( + "-t", + "--vid-type", + default="int", + help="space vid type, values should be [int, string], default: int", +) +@click.option("-vu", default=100, help="concurrent virtual users, default: 100") +@click.option( + "-d", "--duration", default=60, help="duration for every scenario, unit: second, default: 60" +) +@click.option("-s", "--scenarioes", default="all", help="special scenarioes, e.g. go.Go1Step") +@click.option("-c", "--controller", default="k6", help="using which test tool") +@click.option( + "--dry-run", + default=False, + is_flag=True, + help="Dry run, just dump stress testing config file, default: False", +) +def run( + folder, address, user, password, space, vid_type, scenarioes, controller, vu, duration, dry_run +): + stress = StressFactory.gen_stress( + _type=controller, + folder=folder, + address=address, + user=user, + password=password, + space=space, + vid_type=vid_type, + scenarios=scenarioes, + vu=vu, + duration=duration, + dry_run=dry_run, + ) + stress.run() + + pass diff --git a/nebula_bench/common/base.py b/nebula_bench/common/base.py index ecd42a9..3471cd4 100644 --- a/nebula_bench/common/base.py +++ b/nebula_bench/common/base.py @@ -1,7 +1,6 @@ # -*- encoding: utf-8 -*- import re import time -import gevent from collections import deque import csv from pathlib import Path @@ -138,85 +137,19 @@ def __new__(cls, name, bases, attrs, *args, **kwargs): # super(ScenarioMeta, cls).__new__(cls, name, bases, attrs, *args, **kwargs) if name == "BaseScenario": return type.__new__(cls, name, bases, attrs) - report_name = attrs.get("report_name") - result_file_name = attrs.get("result_file_name") - if result_file_name is None: - result_file_name = "_".join(report_name.split(" ")) - attrs["result_file_name"] = result_file_name - statement = attrs.get("statement") - parameters = attrs.get("parameters") or () - latency_warning_us = attrs.get("latency_warning_us") - _generator = StmtGenerator(statement, parameters, setting.DATA_FOLDER) - flag = False - - attrs["generator"] = _generator - attrs["client"] = NebulaClient() - - def my_task(self): - nonlocal flag - - stmt = next(_generator) - - # sleep for first request - if not flag: - logger.info("first stmt is {}".format(stmt)) - gevent.sleep(3) - flag = True - - cur_time = time.monotonic() - r = self.client.execute(stmt) - total_time = time.monotonic() - cur_time - assert isinstance(r, ResultSet) - # warning the latency for slow statement. - if latency_warning_us is not None: - if r.latency() > latency_warning_us: - logger.warning("the statement [{}] latency is {} us".format(stmt, r.latency())) - if r.is_succeeded(): - self.environment.events.request_success.fire( - request_type="Nebula", - name=report_name, - response_time=total_time * 1000, - response_length=0, - ) - else: - logger.error( - "the statement [{}] is not succeeded, error message is {}".format( - stmt, r.error_msg() - ) - ) - self.environment.events.request_failure.fire( - request_type="Nebula", - name=report_name, - response_time=total_time * 1000, - response_length=0, - exception=Exception(r.error_msg()), - ) - - attrs["tasks"] = [my_task] + if attrs.get("name", None) is None: + attrs["name"] = name return type.__new__(cls, name, bases, attrs) class BaseScenario(metaclass=ScenarioMeta): abstract = True - report_name: str - result_file_name: str - statement: str - parameters = () - - def __init__(self, environment): - from locust.user.users import UserMeta - - self.environment = environment - - def on_start(self): - self.client.add_session() - - def on_stop(self): - self.client.release_session() - - -query = namedtuple("query", ["name", "stmt"]) + nGQL: str + stage: dict + csv_path: str + csv_index: list + name: str class BaseQuery(object): diff --git a/nebula_bench/scenarios/find_path.py b/nebula_bench/scenarios/find_path.py new file mode 100644 index 0000000..61ff675 --- /dev/null +++ b/nebula_bench/scenarios/find_path.py @@ -0,0 +1,9 @@ +# -*- encoding: utf-8 -*- +from nebula_bench.common.base import BaseScenario + + +class FindShortestPath(BaseScenario): + abstract = False + nGQL = "FIND SHORTEST PATH FROM {} TO {} OVER *" + csv_path = "social_network/dynamic/person_knows_person.csv" + csv_index = [0, 1] diff --git a/nebula_bench/scenarios/go.py b/nebula_bench/scenarios/go.py new file mode 100644 index 0000000..b625612 --- /dev/null +++ b/nebula_bench/scenarios/go.py @@ -0,0 +1,23 @@ +# -*- encoding: utf-8 -*- +from nebula_bench.common.base import BaseScenario + + +class BaseGoScenario(BaseScenario): + abstract = True + nGQL = "GO 1 STEP FROM {} OVER KNOWS" + csv_path = "social_network/dynamic/person.csv" + csv_index = [0] + + +class Go1Step(BaseGoScenario): + abstract = False + nGQL = "GO 1 STEP FROM {} OVER KNOWS" + + +class Go2Step(BaseGoScenario): + abstract = False + nGQL = "GO 2 STEP FROM {} OVER KNOWS" + +class Go3Step(BaseGoScenario): + abstract = False + nGQL = "GO 3 STEP FROM {} OVER KNOWS" diff --git a/nebula_bench/stress.py b/nebula_bench/stress.py new file mode 100644 index 0000000..e19dac9 --- /dev/null +++ b/nebula_bench/stress.py @@ -0,0 +1,149 @@ +# -*- encoding: utf-8 -*- +import sys +import inspect +from pathlib import Path + +from nebula_bench.utils import load_class, jinja_dump, run_process +from nebula_bench.common.base import BaseScenario +from nebula_bench.utils import logger +from nebula_bench import setting + + +class Stress(object): + def __init__( + self, + folder, + address, + user, + password, + space, + vid_type, + scenarios, + vu, + duration, + dry_run, + **kwargs + ): + self.folder = folder or setting.DATA_FOLDER + self.address = address or setting.NEBULA_ADDRESS + self.user = user or setting.NEBULA_USER + self.password = password or setting.NEBULA_PASSWORD + self.space = space or setting.NEBULA_SPACE + self.vid_type = vid_type + self.scenarios = [] + self.output_folder = "output" + self.vu = vu + self.duration = duration + self.dry_run = dry_run + self.load_scenarios(scenarios) + + def load_scenarios(self, scenarios): + if scenarios.strip().upper() == "ALL": + self.scenarios = load_class("nebula_bench.scenarios", True, BaseScenario) + else: + self.scenarios = load_class("nebula_bench.scenarios", False, BaseScenario, scenarios) + + self.scenarios = [x for x in self.scenarios if x.abstract == False] + logger.info("total stress test scenarios is {}".format(len(self.scenarios))) + + # dump config file + def dump_config(self, scenario): + pass + + def run(self): + pass + + +class StressFactory(object): + type_list = ["K6"] + + @classmethod + def gen_stress( + cls, + _type, + folder, + address, + user, + password, + space, + vid_type, + scenarios, + vu, + duration, + dry_run=None, + **kwargs + ): + if _type.upper() not in cls.type_list: + raise Exception("not impletment this test tool, tool is {}".format(_type)) + + clazz = cls.get_all_stress_class().get("{}Stress".format(_type.upper()), None) + return clazz( + folder, + address, + user, + password, + space, + vid_type, + scenarios, + vu, + duration, + dry_run, + **kwargs + ) + + @classmethod + def get_all_stress_class(cls): + r = {} + current_module = sys.modules[__name__] + for name, clazz in inspect.getmembers(current_module): + if inspect.isclass(clazz) and issubclass(clazz, Stress): + r[name] = clazz + return r + + +class K6Stress(Stress): + def dump_config(self, scenario): + assert issubclass(scenario, BaseScenario) + name = scenario.name + kwargs = { + "address": self.address, + "user": self.user, + "password": self.password, + "space": self.space, + "csv_path": "{}/{}".format(self.folder, scenario.csv_path), + "output_path": "{}/output_{}.csv".format(self.output_folder, name), + "nGQL": scenario.nGQL, + } + + kwargs["param"] = ",".join(["d[" + str(x) + "]" for x in scenario.csv_index]) + logger.info( + "begin dump stress config, config file is {}".format( + "{}/{}.js".format(self.output_folder, name) + ) + ) + jinja_dump("k6_config.js.j2", "{}/{}.js".format(self.output_folder, name), kwargs) + + def run(self): + logger.info("run stress test in k6") + logger.info( + "every scenario would run by {} vus and last {} secconds".format(self.vu, self.duration) + ) + Path(self.output_folder).mkdir(exist_ok=True) + for scenario in self.scenarios: + + self.dump_config(scenario) + # run k6 + command = [ + "scripts/k6", + "run", + "{}/{}.js".format(self.output_folder, scenario.name), + "-u", + str(self.vu), + "-d", + "{}s".format(self.duration), + "--summary-export", + "{}/result_{}.json".format(self.output_folder, scenario.name), + ] + if self.dry_run is not None and self.dry_run: + continue + run_process(command) diff --git a/nebula_bench/utils.py b/nebula_bench/utils.py index eab9fc7..7683a29 100644 --- a/nebula_bench/utils.py +++ b/nebula_bench/utils.py @@ -22,13 +22,15 @@ def load_class(package_name, load_all, base_class, class_name=None): r = [] if load_all: _package = importlib.import_module(package_name) - p = Path(_package.__path__[0]) + for namespace_p in _package.__path__: + p = Path(namespace_p) + break for _module_path in p.iterdir(): name = _module_path.name.rsplit(".", 1)[0] _module = importlib.import_module(package_name + "." + name) for name in dir(_module): _class = getattr(_module, name) - if type(_class) != type: + if not isinstance(_class, type): continue if issubclass(_class, base_class) and _class.__name__ != base_class.__name__: r.append(_class) diff --git a/scripts/k6 b/scripts/k6 new file mode 100755 index 0000000..a4b1146 Binary files /dev/null and b/scripts/k6 differ diff --git a/templates/k6_config.js.j2 b/templates/k6_config.js.j2 new file mode 100644 index 0000000..aa8dd05 --- /dev/null +++ b/templates/k6_config.js.j2 @@ -0,0 +1,48 @@ +import nebulaPool from 'k6/x/nebulagraph'; +import { check } from 'k6'; +import { Trend } from 'k6/metrics'; +import { sleep } from 'k6'; + +var lantencyTrend = new Trend('latency'); +var responseTrend = new Trend('responseTime'); +// initial nebula connect pool +var pool = nebulaPool.init("{{ address }}", 2000); +// initial session for every vu +var session = pool.getSession("{{ user }}", "{{ password }}") +session.execute("USE {{ space }}") + + +String.prototype.format = function () { + var formatted = this; + for (var arg in arguments) { + formatted = formatted.replace("{}", arguments[arg]); + } + return formatted; +}; + +export function setup() { + // config csv file + pool.configCSV("{{ csv_path }}", "|", false) + // config output file, save every query information + pool.configOutput("{{ output_path }}") + sleep(1) +} + +export default function (data) { + + // get csv data from csv file + let d = pool.getData() + // d[0] means the first column data in the csv file + let ngql = "{{ nGQL }}".format({{ param }}) + let response = session.execute(ngql) + check(response, { + "IsSucceed": (r) => r.isSucceed() === true + }); + // add trend + lantencyTrend.add(response.getLatency()); + responseTrend.add(response.getResponseTime()); +}; + +export function teardown() { + pool.close() +} \ No newline at end of file