Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions .github/workflows/slo_cpp_sdk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
name: SLO CPP SDK

on:
workflow_dispatch:
inputs:
github_pull_request_number:
required: true
slo_workload_duration_seconds:
default: '600'
required: false
slo_workload_read_max_rps:
default: '1000'
required: false
slo_workload_write_max_rps:
default: '100'
required: false

jobs:
ydb-slo-action:
name: Run YDB SLO Tests
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: write

strategy:
matrix:
include:
- workload: table
connection-string: grpc://localhost:2135/?database=/Root/testdb
run-args: |
--metrics-push-url http://localhost:9090/api/v1/otlp/v1/metrics \
--time ${{inputs.slo_workload_duration_seconds || 600}} \
--read-rps ${{inputs.slo_workload_read_max_rps || 1000}} \
--write-rps ${{inputs.slo_workload_write_max_rps || 100}} \
--read-timeout 1000 \
--write-timeout 1000

concurrency:
group: slo-${{ github.ref }}
cancel-in-progress: true

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Configure git
uses: ./.github/actions/configure_git

- name: Build SLO Workload Binary
run: |
./ya make ydb/public/sdk/cpp/tests/slo_workloads/key_value -r

- name: Initialize YDB SLO
uses: ydb-platform/ydb-slo-action/init@main
with:
github_pull_request_number: ${{ github.event.inputs.github_pull_request_number }}
github_token: ${{ secrets.GITHUB_TOKEN }}
workload_name: ${{ matrix.workload }}
ydb_database_node_count: 5

- name: Prepare SLO Database
run: |
./ydb/public/sdk/cpp/tests/slo_workloads/key_value/key_value ${{ matrix.connection-string }} create

- name: Run SLO Tests
run: |
./ydb/public/sdk/cpp/tests/slo_workloads/key_value/key_value ${{ matrix.connection-string }} run ${{ matrix.run-args }}

- if: always()
name: Store ydb chaos testing logs
run: |
docker logs ydb-chaos > chaos-ydb.log

- if: always()
uses: actions/upload-artifact@v4
with:
name: ${{ matrix.workload }}-chaos-ydb.log
path: ./chaos-ydb.log
retention-days: 1

- if: always()
name: Cleanup SLO Database
run: |
./ydb/public/sdk/cpp/tests/slo_workloads/key_value/key_value ${{ matrix.connection-string }} cleanup

- name: Publish YDB SLO Report
uses: ydb-platform/ydb-slo-action/report@main
with:
github_run_id: ${{ github.event.workflow_run.id }}
github_token: ${{ secrets.GITHUB_TOKEN }}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ using namespace NYdb::NTable;


TGenerateInitialContentJob::TGenerateInitialContentJob(const TCreateOptions& createOpts, std::uint32_t maxId)
: TThreadJob(createOpts.CommonOptions)
: TThreadJob(createOpts.CommonOptions, "generate")
, Executor(createOpts.CommonOptions, Stats, TExecutor::ModeBlocking)
, PackGenerator(
createOpts.CommonOptions
Expand Down Expand Up @@ -81,5 +81,4 @@ UPSERT INTO `%s` SELECT * FROM AS_TABLE($items);
void TGenerateInitialContentJob::OnFinish() {
Executor.Finish();
Executor.Wait();
Stats.Flush();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ int DoRun(TDatabaseOptions& dbOptions, int argc, char** argv) {

if (!runOptions.DontRunA) {
runOptions.CommonOptions.Rps = runOptions.Read_rps;
runOptions.CommonOptions.ReactionTime = TDuration::MilliSeconds(runOptions.CommonOptions.A_ReactionTime);
runOptions.CommonOptions.ReactionTime = runOptions.ReadTimeout;
jobs->Add(new TReadJob(runOptions.CommonOptions, maxId));
}
if (!runOptions.DontRunB) {
runOptions.CommonOptions.Rps = runOptions.Write_rps;
runOptions.CommonOptions.ReactionTime = DefaultReactionTime;
runOptions.CommonOptions.ReactionTime = runOptions.WriteTimeout;
jobs->Add(new TWriteJob(runOptions.CommonOptions, maxId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class TReadJob : public TThreadJob {
private:
std::unique_ptr<TExecutor> Executor;
std::uint32_t ObjectIdRange;
bool SaveResult;
};

int CreateTable(TDatabaseOptions& dbOptions);
Expand Down
12 changes: 3 additions & 9 deletions ydb/public/sdk/cpp/tests/slo_workloads/key_value/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using namespace NYdb;
using namespace NYdb::NTable;

TWriteJob::TWriteJob(const TCommonOptions& opts, std::uint32_t maxId)
: TThreadJob(opts)
: TThreadJob(opts, "write")
, Executor(opts, Stats, TExecutor::ModeNonBlocking)
, Generator(opts, maxId)
{}
Expand Down Expand Up @@ -77,15 +77,13 @@ UPSERT INTO `%s` SELECT * FROM AS_TABLE($items);
void TWriteJob::OnFinish() {
Executor.Finish();
Executor.Wait();
Stats.Flush();
}

// Implementation of TReadJob
TReadJob::TReadJob(const TCommonOptions& opts, std::uint32_t maxId)
: TThreadJob(opts)
, Executor(opts.RetryMode ? new TExecutorWithRetry(opts, Stats) : new TExecutor(opts, Stats))
: TThreadJob(opts, "read")
, Executor(std::make_unique<TExecutor>(opts, Stats))
, ObjectIdRange(static_cast<std::uint32_t>(maxId * 1.25)) // 20% of requests with no result
, SaveResult(opts.SaveResult)
{}

void TReadJob::ShowProgress(TStringBuilder& report) {
Expand Down Expand Up @@ -154,8 +152,4 @@ void TReadJob::OnFinish() {
if (infly) {
Cerr << "Warning: thread A finished while having " << infly << " infly requests." << Endl;
}
Stats.Flush();
if (SaveResult) {
Stats.SaveResult();
}
}
5 changes: 0 additions & 5 deletions ydb/public/sdk/cpp/tests/slo_workloads/key_value/ya.make
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
SUBSCRIBER(
pnv1
g:kikimr
)

PROGRAM()

SRCS(
Expand Down
108 changes: 9 additions & 99 deletions ydb/public/sdk/cpp/tests/slo_workloads/utils/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,17 @@ bool TExecutor::Execute(const NYdb::NTable::TTableClient::TOperationFunc& func)
}
}

TStatUnit stat = Stats.CreateStatUnit();
auto stat = Stats.StartRequest();

auto future = InsistentClient.ExecuteWithRetry([func, stat](NYdb::NTable::TSession session) {
auto result = func(session);
return result;
});

auto future = InsistentClient.ExecuteWithRetry(func);
future.Subscribe([this, stat, SemaphoreWrapper](const TAsyncFinalStatus& future) mutable {
Y_ABORT_UNLESS(future.HasValue());
TFinalStatus resultStatus = future.GetValue();
Stats.Report(stat, resultStatus);
Stats.FinishRequest(stat, resultStatus);
if (resultStatus) {
CheckForError(*resultStatus);
}
Expand Down Expand Up @@ -351,7 +355,6 @@ bool TExecutor::IsStopped() {
}

void TExecutor::Finish() {
// Stats.UpdateSessionStats(InsistentClient.GetSessionStats());
with_lock(Lock) {
if (!AllJobsLaunched) {
AllJobsLaunched = true;
Expand All @@ -361,9 +364,6 @@ void TExecutor::Finish() {
}

void TExecutor::UpdateStats() {
if (Infly > MaxSecInfly) {
MaxSecInfly = Infly;
}
std::uint64_t activeSessions = InsistentClient.GetActiveSessions();
if (activeSessions > MaxSecSessions) {
MaxSecSessions = activeSessions;
Expand All @@ -382,10 +382,10 @@ void TExecutor::UpdateStats() {
}

void TExecutor::ReportStats() {
Stats.ReportStats(MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
TInstant now = TInstant::Now();
if (now.Seconds() > LastReportSec) {
Stats.ReportStats(MaxSecInfly, MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
MaxSecInfly = 0;
Stats.ReportStats(MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
MaxSecSessions = 0;
MaxSecReadPromises = 0;
MaxSecExecutorPromises = 0;
Expand Down Expand Up @@ -442,93 +442,3 @@ void TExecutor::Report(TStringBuilder& out) const {
}
}
}


TExecutorWithRetry::TExecutorWithRetry(const TCommonOptions& opts, TStat& stats)
: TExecutor(opts, stats)
{}

bool TExecutorWithRetry::Execute(const NYdb::NTable::TTableClient::TOperationFunc& func) {
auto threadFunc = [this, func]() {
if (IsStopped()) {
DecrementWaiting();
return;
}

with_lock(Lock) {
--Waiting;
if (Infly < Opts.MaxInfly) {
++Infly;
if (Infly > MaxInfly) {
MaxInfly = Infly;
}
UpdateStats();
} else {
Stats.ReportMaxInfly();
UpdateStats();
return;
}
}

std::shared_ptr<TRetryContext> context = std::make_shared<TRetryContext>(Stats);

auto executeOperation = [this, func]() {
return InsistentClient.ExecuteWithRetry(func);
};

context->HandleStatusFunc = std::make_unique<std::function<void(const TAsyncFinalStatus& resultFuture)>>(
[this, executeOperation, context](const TAsyncFinalStatus& future) mutable {
Y_ABORT_UNLESS(future.HasValue());
TFinalStatus resultStatus = future.GetValue();
if (resultStatus) {
// Reply received
CheckForError(*resultStatus);
if (resultStatus->IsSuccess()) {
//Ok received
Stats.Report(context->LifeTimeStat, resultStatus->GetStatus());
DecrementInfly();
context->HandleStatusFunc.reset();
return;
}
}
if (IsStopped() || TInstant::Now() - context->LifeTimeStat.Start > GlobalTimeout) {
// Application stopped working or global timeout reached. Ok reply hasn't received yet
Stats.Report(context->LifeTimeStat, TInnerStatus::StatusNotFinished);
DecrementInfly();
context->HandleStatusFunc.reset();
return;
}
Stats.Report(context->PerRequestStat, resultStatus);
context->PerRequestStat = Stats.CreateStatUnit();
// Retrying:
executeOperation().Subscribe(*context->HandleStatusFunc);
});

context->Retries.fetch_add(1);
Y_ABORT_UNLESS(context->Retries.load() < 500, "Too much retries");

executeOperation().Subscribe(*context->HandleStatusFunc);
};

if (IsStopped()) {
return false;
}

bool CanLaunchJob = false;

with_lock(Lock) {
if (!AllJobsLaunched) {
CanLaunchJob = true;
++Waiting;
}
}

if (CanLaunchJob) {
if (!InputQueue->AddFunc(threadFunc)) {
DecrementWaiting();
}
}
++InProgressCount;
InProgressSum += InputQueue->Size();
return true;
}
24 changes: 2 additions & 22 deletions ydb/public/sdk/cpp/tests/slo_workloads/utils/executor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "statistics.h"
#include "utils.h"

#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
Expand Down Expand Up @@ -56,7 +57,7 @@ class TInsistentClient {
bool Valid = true;
};

struct TOperationContext : public TThrRefBase {
struct TOperationContext {
bool Finished = false;
TAdaptiveLock Lock;
TCheckedIterator RetryIter;
Expand Down Expand Up @@ -131,7 +132,6 @@ class TExecutor {
std::uint32_t Wait(TDuration waitTimeout);
bool IsStopped();
void Finish();
std::uint32_t GetTotal() const;
void Report(TStringBuilder& out) const;

protected:
Expand Down Expand Up @@ -167,8 +167,6 @@ class TExecutor {
TInstant Deadline;
// Last second we reported Infly
std::uint64_t LastReportSec = 0;
// Max infly for current second
std::uint64_t MaxSecInfly = 0;
// Max Active sessions for current second
std::uint64_t MaxSecSessions = 0;

Expand All @@ -178,21 +176,3 @@ class TExecutor {
std::uint64_t MaxSecReadPromises = 0;
std::uint64_t MaxSecExecutorPromises = 0;
};

class TExecutorWithRetry : public TExecutor {
public:
struct TRetryContext {
TRetryContext(TStat& stat)
: LifeTimeStat(stat.CreateStatUnit())
, PerRequestStat(stat.CreateStatUnit())
{}

TStatUnit LifeTimeStat;
TStatUnit PerRequestStat;
std::unique_ptr<std::function<void(const TAsyncFinalStatus& resultFuture)>> HandleStatusFunc;
std::atomic<std::uint64_t> Retries = 0;
};

TExecutorWithRetry(const TCommonOptions& opts, TStat& stats);
bool Execute(const NYdb::NTable::TTableClient::TOperationFunc& func) override;
};
Loading
Loading