From 389ff3e86b5f775fcb43d9b11a647b0a6e3caeae Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Sun, 23 Nov 2025 08:29:26 +0000 Subject: [PATCH] s3 downloader async job --- .../backup/async_jobs/import_downloader.cpp | 103 ++++++++++++ .../backup/async_jobs/import_downloader.h | 12 ++ .../async_jobs/ut/ut_import_downloader.cpp | 149 ++++++++++++++++++ .../columnshard/backup/async_jobs/ut/ya.make | 27 ++++ .../tx/columnshard/backup/async_jobs/ya.make | 15 ++ 5 files changed, 306 insertions(+) create mode 100644 ydb/core/tx/columnshard/backup/async_jobs/import_downloader.cpp create mode 100644 ydb/core/tx/columnshard/backup/async_jobs/import_downloader.h create mode 100644 ydb/core/tx/columnshard/backup/async_jobs/ut/ut_import_downloader.cpp create mode 100644 ydb/core/tx/columnshard/backup/async_jobs/ut/ya.make create mode 100644 ydb/core/tx/columnshard/backup/async_jobs/ya.make diff --git a/ydb/core/tx/columnshard/backup/async_jobs/import_downloader.cpp b/ydb/core/tx/columnshard/backup/async_jobs/import_downloader.cpp new file mode 100644 index 000000000000..cf3e018e6d95 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/async_jobs/import_downloader.cpp @@ -0,0 +1,103 @@ +#include "import_downloader.h" + +#include +#include +#include + +namespace NKikimr::NColumnShard::NBackup { + +TConclusion> CreateAsyncJobImportDownloader(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo) { + const auto settingsKind = restoreTask.GetSettingsCase(); + switch (settingsKind) { + case NKikimrSchemeOp::TRestoreTask::kS3Settings: + #ifndef KIKIMR_DISABLE_S3_OPS + return std::unique_ptr(CreateS3Downloader(subscriberActorId, txId, restoreTask, tableInfo)); + #else + return TConclusionStatus::Fail("Exports to S3 are disabled"); + #endif + default: + return TConclusionStatus::Fail(TStringBuilder() << "Unknown settings: " << static_cast(settingsKind)); + } +} + +class TImportDownloader: public TActorBootstrapped { +public: + TImportDownloader(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo) + : SubscriberActorId(subscriberActorId) + , TxId(txId) + , RestoreTask(restoreTask) + , TableInfo(tableInfo) { + } + + void Bootstrap() { + Register(CreateAsyncJobImportDownloader(SelfId(), TxId, RestoreTask, TableInfo).DetachResult().release()); + Become(&TThis::StateMain); + } + + STRICT_STFUNC( + StateMain, + hFunc(NKikimr::TEvDataShard::TEvGetS3DownloadInfo, Handle) + hFunc(NKikimr::TEvDataShard::TEvStoreS3DownloadInfo, Handle) + hFunc(NKikimr::TEvDataShard::TEvS3UploadRowsRequest, Handle) + hFunc(NDataShard::TDataShard::TEvPrivate::TEvAsyncJobComplete, Handle) + ) + + void Handle(NKikimr::TEvDataShard::TEvGetS3DownloadInfo::TPtr& ev) { + Cerr << "TEvGetS3DownloadInfo: " << ev->Get()->ToString() << Endl; + Send(ev->Sender, new NKikimr::TEvDataShard::TEvS3DownloadInfo()); + Y_UNUSED(ev); + + } + + void Handle(NKikimr::TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev) { + Send(ev->Sender, new NKikimr::TEvDataShard::TEvS3DownloadInfo(ev->Get()->Info)); + Y_UNUSED(ev); + } + + TVector> MakeYdbSchema() { + return {{"key", NScheme::TTypeInfo(NScheme::NTypeIds::String)}, {"value", NScheme::TTypeInfo(NScheme::NTypeIds::String)}}; + } + + void Handle(NKikimr::TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev) { + TSerializedCellVec keyCells; + TSerializedCellVec valueCells; + + NArrow::TArrowBatchBuilder batchBuilder; + const auto startStatus = batchBuilder.Start(MakeYdbSchema()); + if (!startStatus.ok()) { + /* TODO: error handling */ + } + + + for (const auto& r : ev->Get()->Record.GetRows()) { + // TODO: use safe parsing! + keyCells.Parse(r.GetKeyColumns()); + valueCells.Parse(r.GetValueColumns()); + batchBuilder.AddRow(keyCells.GetCells(), valueCells.GetCells()); + } + + auto resultBatch = batchBuilder.FlushBatch(false); + + auto response = new NKikimr::TEvDataShard::TEvS3UploadRowsResponse(); + response->Info = ev->Get()->Info; + Send(ev->Sender, response); + Y_UNUSED(ev); + } + + void Handle(NDataShard::TDataShard::TEvPrivate::TEvAsyncJobComplete::TPtr& ev) { + Y_UNUSED(ev); + } + +private: + NActors::TActorId SubscriberActorId; + ui64 TxId; + NKikimrSchemeOp::TRestoreTask RestoreTask; + NKikimr::NDataShard::TTableInfo TableInfo; +}; + + +std::unique_ptr CreateImportDownloaderImport(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo) { + return std::make_unique(subscriberActorId, txId, restoreTask, tableInfo); +} + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/backup/async_jobs/import_downloader.h b/ydb/core/tx/columnshard/backup/async_jobs/import_downloader.h new file mode 100644 index 000000000000..3102f94cb4c1 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/async_jobs/import_downloader.h @@ -0,0 +1,12 @@ +#include +#include +#include +#include + +namespace NKikimr::NColumnShard::NBackup { + +TConclusion> CreateAsyncJobImportDownloader(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo); + +std::unique_ptr CreateImportDownloaderImport(const NActors::TActorId& subscriberActorId, ui64 txId, const NKikimrSchemeOp::TRestoreTask& restoreTask, const NKikimr::NDataShard::TTableInfo& tableInfo); + +} \ No newline at end of file diff --git a/ydb/core/tx/columnshard/backup/async_jobs/ut/ut_import_downloader.cpp b/ydb/core/tx/columnshard/backup/async_jobs/ut/ut_import_downloader.cpp new file mode 100644 index 000000000000..a7daebd7ce8f --- /dev/null +++ b/ydb/core/tx/columnshard/backup/async_jobs/ut/ut_import_downloader.cpp @@ -0,0 +1,149 @@ +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace NKikimr { + +namespace { + +using TRuntimePtr = std::shared_ptr; + +std::shared_ptr TestRecordBatch() { + std::vector keys = {"foo", "bar", "baz"}; + std::vector values = {"one", "two", "three"}; + + arrow::StringBuilder key_builder; + for (const auto& k : keys) { + Y_UNUSED(key_builder.Append(k)); + } + std::shared_ptr key_array; + Y_UNUSED(key_builder.Finish(&key_array)); + + arrow::StringBuilder value_builder; + for (const auto& v : values) { + Y_UNUSED(value_builder.Append(v)); + } + std::shared_ptr value_array; + Y_UNUSED(value_builder.Finish(&value_array)); + + auto schema = arrow::schema({ + arrow::field("key", arrow::binary()), + arrow::field("value", arrow::binary()) + }); + + return arrow::RecordBatch::Make(schema, keys.size(), {key_array, value_array}); +} + +NDataShard::IExport::TTableColumns MakeYdbColumns() { + NDataShard::IExport::TTableColumns columns; + columns[0] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "key", true); + columns[1] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "value", false); + return columns; +} + +NKikimrSchemeOp::TBackupTask MakeBackupTask(const TString& bucketName) { + NKikimrSchemeOp::TBackupTask backupTask; + backupTask.SetEnablePermissions(true); + auto& s3Settings = *backupTask.MutableS3Settings(); + s3Settings.SetBucket(bucketName); + s3Settings.SetEndpoint(GetEnv("S3_ENDPOINT")); + auto& table = *backupTask.MutableTable(); + auto& tableDescription = *table.MutableColumnTableDescription(); + tableDescription.SetColumnShardCount(4); + auto& col1 = *tableDescription.MutableSchema()->MutableColumns()->Add(); + col1.SetName("key"); + col1.SetType("String"); + + auto& col2 = *tableDescription.MutableSchema()->MutableColumns()->Add(); + col2.SetName("value"); + col2.SetType("String"); + table.MutableSelf(); + return backupTask; +} + +NKikimrSchemeOp::TRestoreTask MakeRestoreTask(const TString& bucketName) { + NKikimrSchemeOp::TRestoreTask restoreTask; + auto& s3Settings = *restoreTask.MutableS3Settings(); + s3Settings.SetBucket(bucketName); + s3Settings.SetEndpoint(GetEnv("S3_ENDPOINT")); + auto& description = *restoreTask.MutableTableDescription(); + auto& col1 = *description.AddColumns(); + col1.SetName("key"); + col1.SetType("String"); + col1.SetId(1); + col1.SetTypeId(NScheme::NTypeIds::String); + auto& col2 = *description.AddColumns(); + col2.SetName("value"); + col2.SetType("String"); + col2.SetId(2); + col2.SetTypeId(NScheme::NTypeIds::String); + description.AddKeyColumnNames("key"); + description.AddKeyColumnIds(1); + return restoreTask; +} + +} + +using namespace NColumnShard; + +Y_UNIT_TEST_SUITE(IScan) { + + Y_UNIT_TEST(MultiExport) { + Aws::S3::S3Client s3Client = NTestUtils::MakeS3Client(); + NTestUtils::CreateBucket("test2", s3Client); + + TRuntimePtr runtime(new TTestBasicRuntime()); + runtime->SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG); + runtime->SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_DEBUG); + SetupTabletServices(*runtime); + + const auto edge = runtime->AllocateEdgeActor(0); + auto exportFactory = std::make_shared(); + auto actor = NKikimr::NColumnShard::NBackup::CreateExportUploaderActor(edge, MakeBackupTask("test2"), exportFactory.get(), MakeYdbColumns(), 0); + auto exporter = runtime->Register(actor.release()); + + TAutoPtr handle; + runtime->DispatchEvents({}, TDuration::Seconds(1)); + runtime->Send(new IEventHandle(exporter, edge, new NColumnShard::TEvPrivate::TEvBackupExportRecordBatch(TestRecordBatch(), false))); + runtime->Send(new IEventHandle(exporter, edge, new NColumnShard::TEvPrivate::TEvBackupExportRecordBatch(TestRecordBatch(), true))); + auto event1 = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(!event1->IsFinish); + auto event2 = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(event2->IsFinish); + + runtime->DispatchEvents({}, TDuration::Seconds(5)); + std::vector result = NTestUtils::GetObjectKeys("test2", s3Client); + UNIT_ASSERT_VALUES_EQUAL(NTestUtils::GetUncommittedUploadsCount("test2", s3Client), 0); + UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", result), "data_00.csv,metadata.json,permissions.pb,scheme.pb"); + auto scheme = NTestUtils::GetObject("test2", "scheme.pb", s3Client); + UNIT_ASSERT_VALUES_EQUAL(scheme, "columns {\n name: \"key\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\ncolumns {\n name: \"value\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\npartitioning_settings {\n min_partitions_count: 4\n}\nstore_type: STORE_TYPE_COLUMN\n"); + auto metadata = NTestUtils::GetObject("test2", "metadata.json", s3Client); + UNIT_ASSERT_VALUES_EQUAL(metadata, "{\"version\":0,\"full_backups\":[{\"snapshot_vts\":[0,0]}],\"permissions\":1,\"changefeeds\":[]}"); + auto data = NTestUtils::GetObject("test2", "data_00.csv", s3Client); + UNIT_ASSERT_VALUES_EQUAL(data, "\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n"); + + + auto restoreTask = MakeRestoreTask("test2"); + auto userTable = MakeIntrusiveConst(ui32(0), restoreTask.GetTableDescription(), ui32(0)); + + auto importActor = NKikimr::NColumnShard::NBackup::CreateImportDownloaderImport(edge, 0, restoreTask, NKikimr::NDataShard::TTableInfo{0, userTable}); + runtime->Register(importActor.release()); + runtime->DispatchEvents({}, TDuration::Seconds(1)); + } +} + +} // namespace NKikimr diff --git a/ydb/core/tx/columnshard/backup/async_jobs/ut/ya.make b/ydb/core/tx/columnshard/backup/async_jobs/ut/ya.make new file mode 100644 index 000000000000..d228cc8a6ad7 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/async_jobs/ut/ya.make @@ -0,0 +1,27 @@ +UNITTEST_FOR(ydb/core/tx/columnshard/backup/async_jobs) + +PEERDIR( + library/cpp/getopt + library/cpp/regex/pcre + library/cpp/svnversion + ydb/apps/ydbd/export + ydb/core/testlib/default + ydb/core/tx + ydb/core/tx/columnshard/hooks/abstract + ydb/core/tx/columnshard/hooks/testing + ydb/core/tx/columnshard/test_helper + ydb/library/aclib/protos + ydb/public/lib/yson_value + ydb/services/metadata + ydb/library/testlib/s3_recipe_helper +) + +YQL_LAST_ABI_VERSION() + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc) + +SRCS( + ut_import_downloader.cpp +) + +END() diff --git a/ydb/core/tx/columnshard/backup/async_jobs/ya.make b/ydb/core/tx/columnshard/backup/async_jobs/ya.make new file mode 100644 index 000000000000..334ec4d85284 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/async_jobs/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + import_downloader.cpp +) + +PEERDIR( + ydb/core/formats/arrow + ydb/library/actors/core +) + +YQL_LAST_ABI_VERSION() + + +END()