diff --git a/ydb/apps/ydbd/export/export.cpp b/ydb/apps/ydbd/export/export.cpp new file mode 100644 index 000000000000..3f85de70e33e --- /dev/null +++ b/ydb/apps/ydbd/export/export.cpp @@ -0,0 +1,27 @@ +#include "export.h" + +#include + +NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt( + const IExport::TTask& task, const IExport::TTableColumns& columns) const +{ + Y_UNUSED(task); + Y_UNUSED(columns); + return nullptr; // not supported +} + +NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3( + const IExport::TTask& task, const IExport::TTableColumns& columns) const +{ +#ifndef KIKIMR_DISABLE_S3_OPS + return new NKikimr::NDataShard::TS3Export(task, columns); +#else + Y_UNUSED(task); + Y_UNUSED(columns); + return nullptr; +#endif +} + +void TDataShardExportFactory::Shutdown() { + // No cleanup required for TDataShardExportFactory. +} diff --git a/ydb/apps/ydbd/export/export.h b/ydb/apps/ydbd/export/export.h new file mode 100644 index 000000000000..9d077f16aa5e --- /dev/null +++ b/ydb/apps/ydbd/export/export.h @@ -0,0 +1,12 @@ +#pragma once + +#include + +class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory { + using IExport = NKikimr::NDataShard::IExport; + +public: + IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; + IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; + void Shutdown() override; +}; diff --git a/ydb/apps/ydbd/export/ya.make b/ydb/apps/ydbd/export/ya.make new file mode 100644 index 000000000000..4b96c52c8e7b --- /dev/null +++ b/ydb/apps/ydbd/export/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + export.cpp +) + +PEERDIR( + yql/essentials/public/types + ydb/core/tx/columnshard/engines/scheme/defaults/protos + ydb/library/mkql_proto/protos + ydb/library/aclib/protos + ydb/library/formats/arrow/protos + ydb/core/tx/datashard +) + +END() diff --git a/ydb/apps/ydbd/main.cpp b/ydb/apps/ydbd/main.cpp index ecd14a20302a..95fd5791871c 100644 --- a/ydb/apps/ydbd/main.cpp +++ b/ydb/apps/ydbd/main.cpp @@ -1,4 +1,4 @@ -#include "export.h" +#include "export/export.h" #include #include #include diff --git a/ydb/apps/ydbd/ya.make b/ydb/apps/ydbd/ya.make index 52fc94066410..d43fbe7603d7 100644 --- a/ydb/apps/ydbd/ya.make +++ b/ydb/apps/ydbd/ya.make @@ -33,27 +33,29 @@ ENDIF() PEERDIR( ydb/apps/version + ydb/apps/ydbd/export ydb/core/driver_lib/run ydb/core/protos ydb/core/security ydb/core/tx/schemeshard ydb/core/ymq/actor ydb/core/ymq/base + ydb/library/breakpad ydb/library/folder_service/mock ydb/library/keys ydb/library/pdisk_io ydb/library/security + ydb/library/yql/udfs/common/clickhouse/client + ydb/library/yql/udfs/common/knn + ydb/library/yql/udfs/common/roaring yql/essentials/parser/pg_wrapper yql/essentials/sql/pg - ydb/library/yql/udfs/common/clickhouse/client yql/essentials/udfs/common/compress_base yql/essentials/udfs/common/datetime2 yql/essentials/udfs/common/digest yql/essentials/udfs/common/histogram yql/essentials/udfs/common/hyperloglog yql/essentials/udfs/common/ip_base - ydb/library/yql/udfs/common/knn - ydb/library/yql/udfs/common/roaring yql/essentials/udfs/common/json yql/essentials/udfs/common/json2 yql/essentials/udfs/common/math @@ -68,7 +70,6 @@ PEERDIR( yql/essentials/udfs/common/url_base yql/essentials/udfs/common/yson2 yql/essentials/udfs/logs/dsv - ydb/library/breakpad ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/columnshard/backup/iscan/iscan.cpp b/ydb/core/tx/columnshard/backup/iscan/iscan.cpp new file mode 100644 index 000000000000..4b611fe384c6 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/iscan/iscan.cpp @@ -0,0 +1,309 @@ +#include "iscan.h" + +#include +#include +#include + +namespace NKikimr::NColumnShard::NBackup { + +TRowWriter::TRowWriter(TVector& rows) + : Rows(rows) +{} + +void TRowWriter::AddRow(const TConstArrayRef& cells) { + TSerializedCellVec serializedKey(cells); + Rows.emplace_back(std::move(serializedKey)); +} + +TConclusion> BatchToRows(const std::shared_ptr& batch, + const TVector>& ydbSchema) { + Y_ABORT_UNLESS(batch); + TVector cellVecs; + cellVecs.reserve(batch->num_rows()); + TRowWriter writer(cellVecs); + NArrow::TArrowToYdbConverter batchConverter(ydbSchema, writer, false); + TString errorMessage; + if (!batchConverter.Process(*batch, errorMessage)) { + return TConclusionStatus::Fail(errorMessage); + } + return cellVecs; +} + +void TExportDriver::Touch(NTable::EScan state) { + ActorSystem->Send(SubscriberActorId, new TEvPrivate::TEvBackupExportState(state)); +} + +void TExportDriver::Throw(const std::exception& e) { + ActorSystem->Send(SubscriberActorId, new TEvPrivate::TEvBackupExportError(e.what())); +} + +ui64 TExportDriver::GetTotalCpuTimeUs() const { + return 0; +} + +TConclusion> CreateIScanExportUploader(const TActorId& subscriberActorId, const NKikimrSchemeOp::TBackupTask& backupTask, const NDataShard::IExportFactory* exportFactory, const NDataShard::IExport::TTableColumns& tableColumns, ui64 txId) { + std::shared_ptr<::NKikimr::NDataShard::IExport> exp; + switch (backupTask.GetSettingsCase()) { + case NKikimrSchemeOp::TBackupTask::kYTSettings: + if (backupTask.HasCompression()) { + return TConclusionStatus::Fail("Exports to YT do not support compression"); + } + if (exportFactory) { + exp = std::shared_ptr(exportFactory->CreateExportToYt(backupTask, tableColumns)); + } else { + return TConclusionStatus::Fail("Exports to YT are disabled"); + } + break; + case NKikimrSchemeOp::TBackupTask::kS3Settings: + NDataShard::NBackupRestoreTraits::ECompressionCodec codec; + if (!TryCodecFromTask(backupTask, codec)) { + return TConclusionStatus::Fail(TStringBuilder() << "Unsupported compression codec: " << backupTask.GetCompression().GetCodec()); + } + if (exportFactory) { + exp = std::shared_ptr(exportFactory->CreateExportToS3(backupTask, tableColumns)); + } else { + return TConclusionStatus::Fail("Exports to S3 are disabled"); + } + break; + case NKikimrSchemeOp::TBackupTask::SETTINGS_NOT_SET: + return TConclusionStatus::Fail("Internal error. It is not possible to have empty settings for backup here"); + } + + auto createUploader = [subscriberActorId = subscriberActorId, txId = txId, exp]() { + return exp->CreateUploader(subscriberActorId, txId); + }; + + THolder buffer{exp->CreateBuffer()}; + std::unique_ptr scan{NDataShard::CreateExportScan(std::move(buffer), createUploader)}; + + return scan; +} + +TExportDriver::TExportDriver(TActorSystem* actorSystem, const TActorId& subscriberActorId) + : ActorSystem(actorSystem) + , SubscriberActorId(subscriberActorId) { +} + +class TUploaderActor: public TActorBootstrapped { +public: + struct TBatchItem { + std::shared_ptr Data; + bool IsLast = false; + }; + + struct TCurrentBatchItem { + ui64 Position = 0; + TVector Batch; + bool IsLast = false; + bool NeedResult = false; + + void Clear() { + Batch.clear(); + Position = 0; + IsLast = false; + NeedResult = true; + } + + TSerializedCellVec& GetRow() { + return Batch[Position]; + } + + bool HasMoreRows() const { + return Position < Batch.size(); + } + }; + + TUploaderActor(const NKikimrSchemeOp::TBackupTask& backupTask, const NDataShard::IExportFactory* exportFactory, const NDataShard::IExport::TTableColumns& tableColumns, const TActorId& subscriberActorId, ui64 txId) + : BackupTask(backupTask) + , ExportFactory(exportFactory) + , TableColumns(tableColumns) + , SubscriberActorId(subscriberActorId) + , TxId(txId) + , ColumnTypes(MakeColumnTypes()) { + } + + void Bootstrap() { + auto exporter = NColumnShard::NBackup::CreateIScanExportUploader(SelfId(), BackupTask, ExportFactory, TableColumns, TxId); + if (!exporter.IsSuccess()) { + Fail(exporter.GetErrorMessage()); + return; + } + + Driver = std::make_unique(TActorContext::ActorSystem(), SelfId()); + Exporter = exporter.DetachResult().release(); + auto initialState = Exporter->Prepare(Driver.get(), MakeRowSchema()); + AFL_VERIFY(initialState.Scan == NTable::EScan::Feed); + + NTable::TLead lead; + auto seekState = Exporter->Seek(lead, 0); + AFL_VERIFY(seekState == NTable::EScan::Feed); + Become(&TThis::StateMain); + } + + void Fail(const TString& errorMessage) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("component", "TUploaderActor")("error", errorMessage); + Send(SubscriberActorId, new TEvPrivate::TEvBackupExportError(errorMessage)); + Exporter->Finish(NTable::EStatus::Done); + PassAway(); + } + + STRICT_STFUNC( + StateMain, + hFunc(TEvPrivate::TEvBackupExportError, Handle) + hFunc(TEvPrivate::TEvBackupExportState, Handle) + hFunc(TEvPrivate::TEvBackupExportRecordBatch, Handle) + ) + + void Handle(const TEvPrivate::TEvBackupExportRecordBatch::TPtr& ev) { + const auto& event = *ev.Get()->Get(); + DataQueue.emplace(event.Data, event.IsLast); + UploadData(); + } + + + void Handle(const TEvPrivate::TEvBackupExportError::TPtr& ev) { + Fail(ev.Get()->Get()->ErrorMessage); + } + + void UploadData() { + if (LastState == NTable::EScan::Sleep) { + return; + } + DrainQueue(); + SendResult(false); + MarkAsLast(); + } + + void SendResult(bool isFinal) { + if (isFinal) { + AFL_VERIFY(!CurrentBatch.HasMoreRows() && CurrentBatch.NeedResult); + } + + if (!CurrentBatch.HasMoreRows() && (isFinal || !CurrentBatch.IsLast) && CurrentBatch.NeedResult) { + Send(SubscriberActorId, new TEvPrivate::TEvBackupExportRecordBatchResult(isFinal)); + if (isFinal) { + Exporter->Finish(NTable::EStatus::Done); + PassAway(); + } + CurrentBatch.NeedResult = false; + } + } + + void DrainQueue() { + if (LastState == NTable::EScan::Sleep) { + return; + } + + while (true) { + if (DataQueue.empty() && !CurrentBatch.HasMoreRows() ) { + return; + } + if (!CurrentBatch.HasMoreRows()) { + auto batch = DataQueue.front(); + DataQueue.pop(); + SendResult(false); + + if (!batch.Data) { + CurrentBatch.Clear(); + CurrentBatch.IsLast = batch.IsLast; + SendResult(false); + continue; + } + + auto result = NColumnShard::NBackup::BatchToRows(batch.Data, ColumnTypes); + if (!result.IsSuccess()) { + Fail(result.GetErrorMessage()); + return; + } + + CurrentBatch.Clear(); + CurrentBatch.IsLast = batch.IsLast; + CurrentBatch.Batch = result.DetachResult(); + } + + FeedRowsToExporter(); + if (LastState == NTable::EScan::Sleep) { + return; + } + } + } + + void FeedRowsToExporter() { + while (CurrentBatch.HasMoreRows()) { + auto& row = CurrentBatch.GetRow(); + NTable::TRowState rowState(row.GetCells().size()); + int i = 0; + for (const auto& cell: row.GetCells()) { + rowState.Set(i++, { NTable::ECellOp::Set, NTable::ELargeObj::Inline }, cell); + } + auto result = Exporter->Feed({}, rowState); + CurrentBatch.Position++; + if (result == NTable::EScan::Sleep) { + LastState = NTable::EScan::Sleep; + return; + } + } + } + + void Handle(const TEvPrivate::TEvBackupExportState::TPtr& ev) { + const auto& event = *ev.Get()->Get(); + if (event.State == NTable::EScan::Final) { + SendResult(true); + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("component", "TUploaderActor")("reason", "successfully finished"); + return; + } + LastState = event.State; + UploadData(); + } + + void MarkAsLast() { + if (CurrentBatch.HasMoreRows()) { + return; + } + if (!DataQueue.empty()) { + return; + } + if (CurrentBatch.IsLast) { + LastState = Exporter->Exhausted(); + } + } + +private: + TIntrusiveConstPtr MakeRowSchema() { + THashMap columns; + for (const auto& column : TableColumns) { + columns[column.first] = NTable::TColumn(column.second.Name, column.first, column.second.Type, column.second.TypeMod); + columns[column.first].KeyOrder = column.first; + } + return NTable::TRowScheme::Make(columns, NUtil::TSecond()); + } + + TVector> MakeColumnTypes() { + TVector> columnTypes; + for (const auto& column : TableColumns) { + columnTypes.emplace_back(column.second.Name, column.second.Type); + } + return columnTypes; + } + + + +private: + TCurrentBatchItem CurrentBatch; + NTable::EScan LastState = NTable::EScan::Sleep; + std::queue DataQueue; + std::unique_ptr Driver; + NTable::IScan* Exporter = nullptr; + NKikimrSchemeOp::TBackupTask BackupTask; + const NDataShard::IExportFactory* ExportFactory; + NDataShard::IExport::TTableColumns TableColumns; + TActorId SubscriberActorId; + ui64 TxId; + TVector> ColumnTypes; +}; + +std::unique_ptr CreateExportUploaderActor(const TActorId& subscriberActorId, const NKikimrSchemeOp::TBackupTask& backupTask, const NDataShard::IExportFactory* exportFactory, const NDataShard::IExport::TTableColumns& tableColumns, ui64 txId) { + return std::make_unique(backupTask, exportFactory, tableColumns, subscriberActorId, txId); +} + +} // namespace NKikimr::NColumnShard::NBackup \ No newline at end of file diff --git a/ydb/core/tx/columnshard/backup/iscan/iscan.h b/ydb/core/tx/columnshard/backup/iscan/iscan.h new file mode 100644 index 000000000000..8051062d0352 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/iscan/iscan.h @@ -0,0 +1,40 @@ +#pragma once + +#include + +#include +#include +#include + + +namespace NKikimr::NColumnShard::NBackup { + +class TRowWriter : public NArrow::IRowWriter { +public: + explicit TRowWriter(TVector& rows); + void AddRow(const TConstArrayRef& cells) override; +private: + TVector& Rows; +}; + +struct TExportDriver final : NTable::IDriver { + TExportDriver(TActorSystem* actorSystem, const TActorId& subscriberActorId); + + virtual void Touch(NTable::EScan state) override; + + virtual void Throw(const std::exception& e) override; + + virtual ui64 GetTotalCpuTimeUs() const override; + +private: + NActors::TActorSystem* ActorSystem; + TActorId SubscriberActorId; +}; + +TConclusion> BatchToRows(const std::shared_ptr& batch, const TVector>& ydbSchema); + +TConclusion> CreateIScanExportUploader(const TActorId& subscriberActorId, const NKikimrSchemeOp::TBackupTask& backupTask, const NDataShard::IExportFactory* exportFactory, const NDataShard::IExport::TTableColumns& tableColumns, ui64 txId); + +std::unique_ptr CreateExportUploaderActor(const TActorId& subscriberActorId, const NKikimrSchemeOp::TBackupTask& backupTask, const NDataShard::IExportFactory* exportFactory, const NDataShard::IExport::TTableColumns& tableColumns, ui64 txId); + +} // namespace NKikimr::NColumnShard::NBackup \ No newline at end of file diff --git a/ydb/core/tx/columnshard/backup/iscan/ut/ut_iscan.cpp b/ydb/core/tx/columnshard/backup/iscan/ut/ut_iscan.cpp new file mode 100644 index 000000000000..b091c97cdaa9 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/iscan/ut/ut_iscan.cpp @@ -0,0 +1,290 @@ +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + + +namespace NKikimr { + +namespace { + +std::shared_ptr TestRecordBatch() { + std::vector keys = {"foo", "bar", "baz"}; + std::vector values = {"one", "two", "three"}; + + arrow::StringBuilder key_builder; + for (const auto& k : keys) { + Y_UNUSED(key_builder.Append(k)); + } + std::shared_ptr key_array; + Y_UNUSED(key_builder.Finish(&key_array)); + + arrow::StringBuilder value_builder; + for (const auto& v : values) { + Y_UNUSED(value_builder.Append(v)); + } + std::shared_ptr value_array; + Y_UNUSED(value_builder.Finish(&value_array)); + + auto schema = arrow::schema({ + arrow::field("key", arrow::binary()), + arrow::field("value", arrow::binary()) + }); + + return arrow::RecordBatch::Make(schema, keys.size(), {key_array, value_array}); +} + +TVector> MakeYdbSchema() { + return {{"key", NScheme::TTypeInfo(NScheme::NTypeIds::String)}, {"value", NScheme::TTypeInfo(NScheme::NTypeIds::String)}}; +} + +TIntrusiveConstPtr MakeSchema() { + NTable::TScheme::TTableSchema tableSchema; + tableSchema.Columns[0] = NTable::TColumn("key", 0, NScheme::TTypeInfo(NScheme::NTypeIds::String), ""); + tableSchema.Columns[0].KeyOrder = 0; + + tableSchema.Columns[1] = NTable::TColumn("value", 1, NScheme::TTypeInfo(NScheme::NTypeIds::String), ""); + tableSchema.Columns[1].KeyOrder = 1; + + return NTable::TRowScheme::Make(tableSchema.Columns, NUtil::TSecond()); +} + +NDataShard::IExport::TTableColumns MakeYdbColumns() { + NDataShard::IExport::TTableColumns columns; + columns[0] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "key", true); + columns[1] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "value", false); + return columns; +} + +NKikimrSchemeOp::TBackupTask MakeBackupTask(const TString& bucketName) { + NKikimrSchemeOp::TBackupTask backupTask; + backupTask.SetEnablePermissions(true); + auto& s3Settings = *backupTask.MutableS3Settings(); + s3Settings.SetBucket(bucketName); + s3Settings.SetEndpoint(GetEnv("S3_ENDPOINT")); + auto& table = *backupTask.MutableTable(); + auto& tableDescription = *table.MutableColumnTableDescription(); + tableDescription.SetColumnShardCount(4); + auto& col1 = *tableDescription.MutableSchema()->MutableColumns()->Add(); + col1.SetName("key"); + col1.SetType("String"); + + auto& col2 = *tableDescription.MutableSchema()->MutableColumns()->Add(); + col2.SetName("value"); + col2.SetType("String"); + table.MutableSelf(); + return backupTask; +} + +using TRuntimePtr = std::shared_ptr; + +class TGrabActor: public TActorBootstrapped { + std::deque>> Futures; + std::deque> Inputs; + TMutex Mutex; + std::unique_ptr Driver; + std::unique_ptr Exporter; + +public: + TRuntimePtr Runtime; + + TGrabActor(TRuntimePtr runtime) + : Runtime(runtime) + { } + + void Bootstrap() { + NDataShard::IExport::TTableColumns columns; + columns[0] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "key", true); + columns[1] = NDataShard::TUserTable::TUserColumn(NScheme::TTypeInfo(NScheme::NTypeIds::String), "", "value", false); + + auto exportFactory = std::make_shared(); + + + Exporter = NColumnShard::NBackup::CreateIScanExportUploader(SelfId(), MakeBackupTask("test"), exportFactory.get(), columns, 0).DetachResult(); + UNIT_ASSERT(Exporter); + Driver = std::make_unique(TActorContext::ActorSystem(), SelfId()); + auto initialState = Exporter->Prepare(Driver.get(), MakeSchema()); + UNIT_ASSERT_VALUES_EQUAL(initialState.Scan, NTable::EScan::Feed); + + NTable::TLead lead; + auto seekState = Exporter->Seek(lead, 0); + UNIT_ASSERT_VALUES_EQUAL(seekState, NTable::EScan::Feed); + + Become(&TGrabActor::StateFunc); + } + + void SendData() { + auto recordBatch = TestRecordBatch(); + TVector cellVec = NColumnShard::NBackup::BatchToRows(recordBatch, MakeYdbSchema()).DetachResult(); + for (const auto& row: cellVec) { + NTable::TRowState rowState(row.GetCells().size()); + int i = 0; + for (const auto& cell: row.GetCells()) { + rowState.Set(i++, { NTable::ECellOp::Set, NTable::ELargeObj::Inline }, cell); + } + Exporter->Feed({}, rowState); + } + auto exhaustedState = Exporter->Exhausted(); + UNIT_ASSERT_VALUES_EQUAL(exhaustedState, NTable::EScan::Sleep); + } + + void Handle(NColumnShard::TEvPrivate::TEvBackupExportState::TPtr& ev) { + if (ev->Get()->State == NTable::EScan::Final) { + return; + } + SendData(); + } + + STFUNC(StateFunc) + { + if (ev->GetTypeRewrite() == NColumnShard::TEvPrivate::TEvBackupExportState::EventType) { + NColumnShard::TEvPrivate::TEvBackupExportState::TPtr* x = reinterpret_cast(&ev); + Handle(*x); + } + + TGuard lock(Mutex); + if (!Futures.empty()) { + auto front = Futures.front(); + Futures.pop_front(); + front.SetValue(ev); + return; + } + Inputs.push_back(ev); + } + + NThreading::TFuture> WaitRequest() + { + TGuard lock(Mutex); + if (!Inputs.empty()) { + auto front = Inputs.front(); + Inputs.pop_front(); + return NThreading::MakeFuture(front); + } + Futures.push_back(NThreading::NewPromise>()); + return Futures.back(); + } + + TAutoPtr GetRequest() + { + auto future = WaitRequest(); + while (!future.HasValue()) { + Runtime->DispatchEvents({}, TDuration::MilliSeconds(1)); + } + return future.GetValue(); + } +}; + +} + +using namespace NColumnShard; + +Y_UNIT_TEST_SUITE(IScan) { + Y_UNIT_TEST(SimpleExport) { + Aws::S3::S3Client s3Client = NTestUtils::MakeS3Client(); + NTestUtils::CreateBucket("test", s3Client); + + TRuntimePtr runtime(new TTestBasicRuntime(1, true)); + runtime->SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG); + SetupTabletServices(*runtime); + + auto grabActor = new TGrabActor(runtime); + runtime->Register(grabActor); + + while (true) { + auto request = grabActor->GetRequest(); + auto event = request->Get(); + UNIT_ASSERT_C(event, request->GetTypeName()); + if (event->State == NTable::EScan::Final) { + break; + } + } + + std::vector result = NTestUtils::GetObjectKeys("test", s3Client); + UNIT_ASSERT_VALUES_EQUAL(NTestUtils::GetUncommittedUploadsCount("test", s3Client), 0); + UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", result), "data_00.csv,metadata.json,permissions.pb,scheme.pb"); + auto scheme = NTestUtils::GetObject("test", "scheme.pb", s3Client); + UNIT_ASSERT_VALUES_EQUAL(scheme, "columns {\n name: \"key\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\ncolumns {\n name: \"value\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\npartitioning_settings {\n min_partitions_count: 4\n}\nstore_type: STORE_TYPE_COLUMN\n"); + auto metadata = NTestUtils::GetObject("test", "metadata.json", s3Client); + UNIT_ASSERT_VALUES_EQUAL(metadata, "{\"version\":0,\"full_backups\":[{\"snapshot_vts\":[0,0]}],\"permissions\":1,\"changefeeds\":[]}"); + auto data = NTestUtils::GetObject("test", "data_00.csv", s3Client); + UNIT_ASSERT_VALUES_EQUAL(data, "\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n"); + } + + Y_UNIT_TEST(UploaderExport) { + Aws::S3::S3Client s3Client = NTestUtils::MakeS3Client(); + NTestUtils::CreateBucket("test1", s3Client); + + TRuntimePtr runtime(new TTestBasicRuntime()); + runtime->SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG); + SetupTabletServices(*runtime); + + const auto edge = runtime->AllocateEdgeActor(0); + auto exportFactory = std::make_shared(); + auto actor = NKikimr::NColumnShard::NBackup::CreateExportUploaderActor(edge, MakeBackupTask("test1"), exportFactory.get(), MakeYdbColumns(), 0); + auto exporter = runtime->Register(actor.release()); + + TAutoPtr handle; + runtime->DispatchEvents({}, TDuration::Seconds(1)); + runtime->Send(new IEventHandle(exporter, edge, new NColumnShard::TEvPrivate::TEvBackupExportRecordBatch(TestRecordBatch(), true))); + auto event = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(event->IsFinish); + + runtime->DispatchEvents({}, TDuration::Seconds(5)); + std::vector result = NTestUtils::GetObjectKeys("test1", s3Client); + UNIT_ASSERT_VALUES_EQUAL(NTestUtils::GetUncommittedUploadsCount("test1", s3Client), 0); + UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", result), "data_00.csv,metadata.json,permissions.pb,scheme.pb"); + auto scheme = NTestUtils::GetObject("test1", "scheme.pb", s3Client); + UNIT_ASSERT_VALUES_EQUAL(scheme, "columns {\n name: \"key\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\ncolumns {\n name: \"value\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\npartitioning_settings {\n min_partitions_count: 4\n}\nstore_type: STORE_TYPE_COLUMN\n"); + auto metadata = NTestUtils::GetObject("test1", "metadata.json", s3Client); + UNIT_ASSERT_VALUES_EQUAL(metadata, "{\"version\":0,\"full_backups\":[{\"snapshot_vts\":[0,0]}],\"permissions\":1,\"changefeeds\":[]}"); + auto data = NTestUtils::GetObject("test1", "data_00.csv", s3Client); + UNIT_ASSERT_VALUES_EQUAL(data, "\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n"); + } + + Y_UNIT_TEST(MultiExport) { + Aws::S3::S3Client s3Client = NTestUtils::MakeS3Client(); + NTestUtils::CreateBucket("test2", s3Client); + + TRuntimePtr runtime(new TTestBasicRuntime()); + runtime->SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_DEBUG); + SetupTabletServices(*runtime); + + const auto edge = runtime->AllocateEdgeActor(0); + auto exportFactory = std::make_shared(); + auto actor = NKikimr::NColumnShard::NBackup::CreateExportUploaderActor(edge, MakeBackupTask("test2"), exportFactory.get(), MakeYdbColumns(), 0); + auto exporter = runtime->Register(actor.release()); + + TAutoPtr handle; + runtime->DispatchEvents({}, TDuration::Seconds(1)); + runtime->Send(new IEventHandle(exporter, edge, new NColumnShard::TEvPrivate::TEvBackupExportRecordBatch(TestRecordBatch(), false))); + runtime->Send(new IEventHandle(exporter, edge, new NColumnShard::TEvPrivate::TEvBackupExportRecordBatch(TestRecordBatch(), true))); + auto event1 = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(!event1->IsFinish); + auto event2 = runtime->GrabEdgeEvent(handle); + UNIT_ASSERT(event2->IsFinish); + + runtime->DispatchEvents({}, TDuration::Seconds(5)); + std::vector result = NTestUtils::GetObjectKeys("test2", s3Client); + UNIT_ASSERT_VALUES_EQUAL(NTestUtils::GetUncommittedUploadsCount("test2", s3Client), 0); + UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", result), "data_00.csv,metadata.json,permissions.pb,scheme.pb"); + auto scheme = NTestUtils::GetObject("test2", "scheme.pb", s3Client); + UNIT_ASSERT_VALUES_EQUAL(scheme, "columns {\n name: \"key\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\ncolumns {\n name: \"value\"\n type {\n optional_type {\n item {\n type_id: STRING\n }\n }\n }\n}\npartitioning_settings {\n min_partitions_count: 4\n}\nstore_type: STORE_TYPE_COLUMN\n"); + auto metadata = NTestUtils::GetObject("test2", "metadata.json", s3Client); + UNIT_ASSERT_VALUES_EQUAL(metadata, "{\"version\":0,\"full_backups\":[{\"snapshot_vts\":[0,0]}],\"permissions\":1,\"changefeeds\":[]}"); + auto data = NTestUtils::GetObject("test2", "data_00.csv", s3Client); + UNIT_ASSERT_VALUES_EQUAL(data, "\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n\"foo\",\"one\"\n\"bar\",\"two\"\n\"baz\",\"three\"\n"); + } +} + +} // namespace NKikimr diff --git a/ydb/core/tx/columnshard/backup/iscan/ut/ya.make b/ydb/core/tx/columnshard/backup/iscan/ut/ya.make new file mode 100644 index 000000000000..2da1aec0971e --- /dev/null +++ b/ydb/core/tx/columnshard/backup/iscan/ut/ya.make @@ -0,0 +1,27 @@ +UNITTEST_FOR(ydb/core/tx/columnshard/backup/iscan) + +PEERDIR( + library/cpp/getopt + library/cpp/regex/pcre + library/cpp/svnversion + ydb/apps/ydbd/export + ydb/core/testlib/default + ydb/core/tx + ydb/core/tx/columnshard/hooks/abstract + ydb/core/tx/columnshard/hooks/testing + ydb/core/tx/columnshard/test_helper + ydb/library/aclib/protos + ydb/public/lib/yson_value + ydb/services/metadata + ydb/library/testlib/s3_recipe_helper +) + +YQL_LAST_ABI_VERSION() + +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc) + +SRCS( + ut_iscan.cpp +) + +END() diff --git a/ydb/core/tx/columnshard/backup/iscan/ya.make b/ydb/core/tx/columnshard/backup/iscan/ya.make new file mode 100644 index 000000000000..751395130ae4 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/iscan/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + iscan.cpp +) + +PEERDIR( + ydb/core/formats/arrow + ydb/library/actors/core +) + +END() + +RECURSE_FOR_TESTS( + ut +) \ No newline at end of file diff --git a/ydb/core/tx/columnshard/backup/ya.make b/ydb/core/tx/columnshard/backup/ya.make new file mode 100644 index 000000000000..78bae5ba1e45 --- /dev/null +++ b/ydb/core/tx/columnshard/backup/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +PEERDIR( + ydb/core/tx/columnshard/backup/iscan +) + +END() diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index a7e60ef56c90..279124253417 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -81,10 +81,15 @@ struct TEvPrivate { EvRequestFilter, EvFilterRequestResourcesAllocated, EvFilterConstructionResult, - + EvReportScanDiagnostics, EvReportScanIteratorDiagnostics, + EvBackupExportRecordBatch, + EvBackupExportRecordBatchResult, + EvBackupExportState, + EvBackupExportError, + EvEnd }; @@ -354,7 +359,7 @@ struct TEvPrivate { return WritesBuffer; } }; - + struct TEvReportScanDiagnostics: public TEventLocal { TEvReportScanDiagnostics(TString&& requestMessage, TString&& dotGraph, TString&& ssaProgram, TString&& pkRangesFilter, bool isPublicScan) : RequestMessage(std::move(requestMessage)) @@ -381,6 +386,47 @@ struct TEvPrivate { ui64 RequestId; TString ScanIteratorDiagnostics; }; + + // *** Backup *** + /* + 1. TEvBackupExportRecordBatch -> Uploader + 2. TEvBackupExportRecordBatchResult | TEvBackupExportError <- Uploader + ---- + 3. TEvBackupExportState - internal message for iscan iface + */ + struct TEvBackupExportRecordBatch: public TEventLocal { + explicit TEvBackupExportRecordBatch(const std::shared_ptr& data, bool isLast) + : Data(data) + , IsLast(isLast) { + } + + std::shared_ptr Data; + bool IsLast; + }; + + struct TEvBackupExportRecordBatchResult: public TEventLocal { + explicit TEvBackupExportRecordBatchResult(bool isFinish) + : IsFinish(isFinish) { + } + + bool IsFinish = false; + }; + + struct TEvBackupExportState: public TEventLocal { + explicit TEvBackupExportState(NTable::EScan state) + : State(state) { + } + + NTable::EScan State; + }; + + struct TEvBackupExportError: public TEventLocal { + explicit TEvBackupExportError(const TString& errorMessage) + : ErrorMessage(errorMessage) { + } + + TString ErrorMessage; + }; }; } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index d09c8941b47f..5a8599bcec88 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -41,6 +41,7 @@ PEERDIR( ydb/core/protos ydb/core/tablet ydb/core/tablet_flat + ydb/core/tx/columnshard/backup ydb/core/tx/columnshard/blobs_action ydb/core/tx/columnshard/blobs_action/storages_manager ydb/core/tx/columnshard/blobs_reader diff --git a/ydb/core/tx/datashard/export_common.cpp b/ydb/core/tx/datashard/export_common.cpp index c0a1f327ed7c..52857ab3feab 100644 --- a/ydb/core/tx/datashard/export_common.cpp +++ b/ydb/core/tx/datashard/export_common.cpp @@ -24,11 +24,8 @@ static void ResortColumns( return it->second; }); } - -TMaybe GenYdbScheme( - const TMap& columns, - const NKikimrSchemeOp::TPathDescription& pathDesc) -{ +static TMaybe GenRowTableScheme(const TMap& columns, + const NKikimrSchemeOp::TPathDescription& pathDesc) { if (!pathDesc.HasTable()) { return Nothing(); } @@ -71,6 +68,45 @@ TMaybe GenYdbScheme( return scheme; } +static TMaybe GenColumnTableScheme(const TMap& columns, + const NKikimrSchemeOp::TPathDescription& pathDesc) { + if (!pathDesc.HasColumnTableDescription()) { + return Nothing(); + } + + Ydb::Table::CreateTableRequest scheme; + + const auto& tableDesc = pathDesc.GetColumnTableDescription(); + NKikimrMiniKQL::TType mkqlKeyType; + + try { + FillColumnDescription(scheme, tableDesc); + } catch (const yexception&) { + return Nothing(); + } + + ResortColumns(*scheme.mutable_columns(), columns); + + FillColumnFamilies(scheme, tableDesc); + FillAttributes(scheme, pathDesc); + FillPartitioningSettings(scheme, tableDesc); + + return scheme; +} + +TMaybe GenYdbScheme( + const TMap& columns, + const NKikimrSchemeOp::TPathDescription& pathDesc) +{ + if (pathDesc.HasTable()) { + return GenRowTableScheme(columns, pathDesc); + } + if (pathDesc.HasColumnTableDescription()) { + return GenColumnTableScheme(columns, pathDesc); + } + return Nothing(); +} + TMaybe GenYdbPermissions(const NKikimrSchemeOp::TPathDescription& pathDesc) { if (!pathDesc.HasSelf()) { return Nothing(); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index b882896a8ca5..3039cd8e2d11 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -1013,6 +1013,17 @@ void FillPartitioningSettingsImpl(TYdbProto& out, FillPartitioningSettings(outPartSettings, partConfig.GetPartitioningPolicy()); } +template +void FillPartitioningSettingsImpl(TYdbProto& out, + const NKikimrSchemeOp::TColumnTableDescription& in) { + + auto& outPartSettings = *out.mutable_partitioning_settings(); + + if (in.HasColumnShardCount()) { + outPartSettings.set_min_partitions_count(in.GetColumnShardCount()); + } +} + void FillGlobalIndexSettings(Ydb::Table::GlobalIndexSettings& settings, const NKikimrSchemeOp::TTableDescription& indexImplTableDescription ) { @@ -1100,7 +1111,7 @@ void FillIndexDescriptionImpl(TYdbProto& out, const NKikimrSchemeOp::TTableDescr break; default: Y_DEBUG_ABORT_S(NTableIndex::InvalidIndexType(tableIndex.GetType())); - + break; }; @@ -1170,7 +1181,7 @@ bool FillIndexDescription(NKikimrSchemeOp::TIndexedTableCreationConfig& out, indexDesc->SetType(NKikimrSchemeOp::EIndexType::EIndexTypeGlobalVectorKmeansTree); *indexDesc->MutableVectorIndexKmeansTreeDescription()->MutableSettings() = index.global_vector_kmeans_tree_index().vector_settings(); break; - + case Ydb::Table::TableIndex::kGlobalFulltextIndex: indexDesc->SetType(NKikimrSchemeOp::EIndexType::EIndexTypeGlobalFulltext); *indexDesc->MutableFulltextIndexDescription()->MutableSettings() = index.global_fulltext_index().fulltext_settings(); @@ -1605,6 +1616,11 @@ void FillPartitioningSettings(Ydb::Table::CreateTableRequest& out, FillPartitioningSettingsImpl(out, in); } +void FillPartitioningSettings(Ydb::Table::CreateTableRequest& out, + const NKikimrSchemeOp::TColumnTableDescription& in) { + FillPartitioningSettingsImpl(out, in); +} + bool CopyExplicitPartitions(NKikimrSchemeOp::TTableDescription& out, const Ydb::Table::ExplicitPartitions& in, Ydb::StatusIds::StatusCode& status, TString& error) { diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 412e4153ef07..f5b9029e7f4d 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -128,6 +128,8 @@ void FillPartitioningSettings(Ydb::Table::DescribeTableResult& out, const NKikimrSchemeOp::TTableDescription& in); void FillPartitioningSettings(Ydb::Table::CreateTableRequest& out, const NKikimrSchemeOp::TTableDescription& in); +void FillPartitioningSettings(Ydb::Table::CreateTableRequest& out, + const NKikimrSchemeOp::TColumnTableDescription& in); // in bool CopyExplicitPartitions(NKikimrSchemeOp::TTableDescription& out,