Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
case NKqpProto::TKqpPhyConnection::kResult:
case NKqpProto::TKqpPhyConnection::kValue:
case NKqpProto::TKqpPhyConnection::kMerge:
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET:
break;
}
Expand Down
12 changes: 7 additions & 5 deletions ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/kqp/runtime/kqp_compute.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
#include <ydb/core/kqp/runtime/kqp_read_table.h>
#include <ydb/core/kqp/runtime/kqp_sequencer_factory.h>
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
#include <ydb/core/kqp/runtime/kqp_vector_actor.h>
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
#include <ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup_factory.h>
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>

namespace NKikimr {
namespace NMiniKQL {
Expand Down Expand Up @@ -90,6 +91,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
RegisterKqpWriteActor(*factory, counters);
RegisterSequencerActorFactory(*factory, counters);
RegisterKqpVectorResolveActor(*factory, counters);
NYql::NDq::RegisterDqInputTransformLookupActorFactory(*factory);

if (federatedQuerySetup) {
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ PEERDIR(
ydb/library/formats/arrow/protos
ydb/library/formats/arrow/common
ydb/library/yql/dq/actors/compute
ydb/library/yql/dq/actors/input_transforms
ydb/library/yql/dq/comp_nodes
ydb/library/yql/providers/generic/actors
ydb/library/yql/providers/pq/async_io
ydb/library/yql/providers/s3/actors_factory
ydb/library/yql/providers/solomon/actors
yql/essentials/public/issue
ydb/library/yql/dq/comp_nodes
)

GENERATE_ENUM_SERIALIZATION(kqp_compute_state.h)
Expand Down
50 changes: 50 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,47 @@ void TKqpTasksGraph::BuildVectorResolveChannels(const TStageInfo& stageInfo, ui3
inputStageInfo, outputIndex, enableSpilling, logFunc);
}

void TKqpTasksGraph::BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const TChannelLogFunc& logFunc) {
YQL_ENSURE(stageInfo.Tasks.size() == 1);

auto* settings = GetMeta().Allocate<NDqProto::TDqInputTransformLookupSettings>();
settings->SetLeftLabel(dqSourceStreamLookup.GetLeftLabel());
settings->SetRightLabel(dqSourceStreamLookup.GetRightLabel());
settings->SetJoinType(dqSourceStreamLookup.GetJoinType());
settings->SetNarrowInputRowType(dqSourceStreamLookup.GetConnectionInputRowType());
settings->SetNarrowOutputRowType(dqSourceStreamLookup.GetConnectionOutputRowType());
settings->SetCacheLimit(dqSourceStreamLookup.GetCacheLimit());
settings->SetCacheTtlSeconds(dqSourceStreamLookup.GetCacheTtlSeconds());
settings->SetMaxDelayedRows(dqSourceStreamLookup.GetMaxDelayedRows());
settings->SetIsMultiget(dqSourceStreamLookup.GetIsMultiGet());

const auto& leftJointKeys = dqSourceStreamLookup.GetLeftJoinKeyNames();
settings->MutableLeftJoinKeyNames()->Assign(leftJointKeys.begin(), leftJointKeys.end());

const auto& rightJointKeys = dqSourceStreamLookup.GetRightJoinKeyNames();
settings->MutableRightJoinKeyNames()->Assign(rightJointKeys.begin(), rightJointKeys.end());

auto& streamLookupSource = *settings->MutableRightSource();
streamLookupSource.SetSerializedRowType(dqSourceStreamLookup.GetLookupRowType());
const auto& compiledSource = dqSourceStreamLookup.GetLookupSource();
streamLookupSource.SetProviderName(compiledSource.GetType());
*streamLookupSource.MutableLookupSource() = compiledSource.GetSettings();

TTransform dqSourceStreamLookupTransform = {
.Type = "StreamLookupInputTransform",
.InputType = dqSourceStreamLookup.GetInputStageRowType(),
.OutputType = dqSourceStreamLookup.GetOutputStageRowType(),
};
YQL_ENSURE(dqSourceStreamLookupTransform.Settings.PackFrom(*settings));

for (const auto taskId : stageInfo.Tasks) {
GetTask(taskId).Inputs[inputIndex].Transform = dqSourceStreamLookupTransform;
}

BuildUnionAllChannels(*this, stageInfo, inputIndex, inputStageInfo, outputIndex, /* enableSpilling */ false, logFunc);
}

void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, bool enableSpilling, bool enableShuffleElimination) {
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);

Expand Down Expand Up @@ -709,6 +750,12 @@ void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, boo
break;
}

case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup: {
BuildDqSourceStreamLookupChannels(stageInfo, inputIdx, inputStageInfo, outputIdx,
input.GetDqSourceStreamLookup(), log);
break;
}

default:
YQL_ENSURE(false, "Unexpected stage input type: " << (ui32)input.GetTypeCase());
}
Expand Down Expand Up @@ -1369,6 +1416,8 @@ void TKqpTasksGraph::FillInputDesc(NYql::NDqProto::TTaskInput& inputDesc, const
}

transformProto->MutableSettings()->PackFrom(*input.Meta.VectorResolveSettings);
} else {
*transformProto->MutableSettings() = input.Transform->Settings;
}
}
}
Expand Down Expand Up @@ -1724,6 +1773,7 @@ bool TKqpTasksGraph::BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCo
case NKqpProto::TKqpPhyConnection::kMap:
case NKqpProto::TKqpPhyConnection::kParallelUnionAll:
case NKqpProto::TKqpPhyConnection::kVectorResolve:
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
break;
default:
YQL_ENSURE(false, "Unexpected connection type: " << (ui32)input.GetTypeCase() << Endl);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ class TKqpTasksGraph : public NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMet
void BuildVectorResolveChannels(const TStageInfo& stageInfo, ui32 inputIndex,
const TStageInfo& inputStageInfo, ui32 outputIndex,
const NKqpProto::TKqpPhyCnVectorResolve& vectorResolve, bool enableSpilling, const NYql::NDq::TChannelLogFunc& logFunc);
void BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const NYql::NDq::TChannelLogFunc& logFunc);

void FillOutputDesc(NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, ui32 outputIdx,
bool enableSpilling, const TStageInfo& stageInfo) const;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,7 @@ class TKqpHost : public IKqpHost {
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
TypesCtx->AddDataSource(ConfigProviderName, configProvider);
TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize();
TypesCtx->StreamLookupJoin = true;

YQL_ENSURE(TypesCtx->Initialize(*ExprCtx));

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeSortToTopSort));
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInToEquiJoin));
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInCompactToJoin));
AddHandler(0, &TCoEquiJoin::Match, HNDL(RewriteStreamEquiJoinWithLookup));
AddHandler(0, &TCoEquiJoin::Match, HNDL(OptimizeEquiJoinWithCosts));
AddHandler(0, &TCoEquiJoin::Match, HNDL(RewriteEquiJoin));
AddHandler(0, &TDqJoin::Match, HNDL(JoinToIndexLookup));
Expand Down Expand Up @@ -167,6 +168,12 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
TExprBase output = DqRewriteStreamEquiJoinWithLookup(node, ctx, TypesCtx);
DumpAppliedRule("KqpRewriteStreamEquiJoinWithLookup", node.Ptr(), output.Ptr(), ctx);
return output;
}

TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
auto maxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize);
auto optLevel = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel);
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoExtendBase::Match, HNDL(BuildExtendStage));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteRightJoinToLeft));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteLeftPureJoin<false>));
AddHandler(0, &TDqJoin::Match, HNDL(RewriteStreamLookupJoin));
AddHandler(0, &TDqJoin::Match, HNDL(BuildJoin<false>));
AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute));
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
Expand Down Expand Up @@ -507,6 +508,14 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> RewriteStreamLookupJoin(TExprBase node, TExprContext& ctx) {
TMaybeNode<TExprBase> output = DqRewriteStreamLookupJoin(node, ctx);
if (output) {
DumpAppliedRule("RewriteStreamLookupJoin", node.Ptr(), output.Cast().Ptr(), ctx);
}
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,8 @@ class TKikimrDataSource : public TDataProviderBase {
node.IsCallable(TDqReadWrap::CallableName()) ||
node.IsCallable(TDqReadWideWrap::CallableName()) ||
node.IsCallable(TDqReadBlockWideWrap::CallableName()) ||
node.IsCallable(TDqSource::CallableName())
node.IsCallable(TDqSource::CallableName()) ||
node.IsCallable(TDqLookupSourceWrap::CallableName())
)
)
{
Expand Down
87 changes: 75 additions & 12 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "kqp_query_compiler.h"

#include <ydb/core/base/table_index.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
#include <ydb/core/kqp/opt/kqp_opt.h>
Expand All @@ -8,24 +9,23 @@
#include <ydb/core/kqp/query_compiler/kqp_olap_compiler.h>
#include <ydb/core/kqp/query_data/kqp_predictor.h>
#include <ydb/core/kqp/query_data/kqp_request_predictor.h>
#include <ydb/core/ydb_convert/ydb_convert.h>

#include <ydb/core/base/table_index.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/mkql_proto/mkql_proto.h>

#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
#include <ydb/library/yql/dq/opt/dq_opt.h>
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
#include <ydb/library/yql/dq/tasks/dq_task_program.h>
#include <yql/essentials/minikql/mkql_node_serialization.h>
#include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/s3/statistics/yql_s3_statistics.h>

#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
#include <yql/essentials/core/yql_opt_utils.h>
#include <yql/essentials/core/yql_type_helpers.h>
#include <yql/essentials/minikql/mkql_node_serialization.h>
#include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/providers/common/structured_token/yql_token_builder.h>


namespace NKikimr {
Expand Down Expand Up @@ -589,6 +589,14 @@ TIssues ApplyOverridePlannerSettings(const TString& overridePlannerJson, NKqpPro
return issues;
}

TStringBuf RemoveJoinAliases(TStringBuf keyName) {
if (const auto idx = keyName.find_last_of('.'); idx != TString::npos) {
return keyName.substr(idx + 1);
}

return keyName;
}

class TKqpQueryCompiler : public IKqpQueryCompiler {
public:
TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
Expand Down Expand Up @@ -795,7 +803,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
auto connection = input.Cast<TDqConnection>();

auto& protoInput = *stageProto.AddInputs();
FillConnection(connection, stagesMap, protoInput, ctx, tablesMap, physicalStageByID);
FillConnection(connection, stagesMap, protoInput, ctx, tablesMap, physicalStageByID, &stage, inputIndex);
protoInput.SetInputIndex(inputIndex);
}
}
Expand Down Expand Up @@ -1017,7 +1025,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {

auto& resultProto = *txProto.AddResults();
auto& connectionProto = *resultProto.MutableConnection();
FillConnection(connection, stagesMap, connectionProto, ctx, tablesMap, physicalStageByID);
FillConnection(connection, stagesMap, connectionProto, ctx, tablesMap, physicalStageByID, nullptr, 0);

const TTypeAnnotationNode* itemType = nullptr;
switch (connectionProto.GetTypeCase()) {
Expand Down Expand Up @@ -1452,7 +1460,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
NKqpProto::TKqpPhyConnection& connectionProto,
TExprContext& ctx,
THashMap<TStringBuf, THashSet<TStringBuf>>& tablesMap,
THashMap<ui64, NKqpProto::TKqpPhyStage*>& physicalStageByID
THashMap<ui64, NKqpProto::TKqpPhyStage*>& physicalStageByID,
const TDqPhyStage* stage,
ui32 inputIndex
) {
auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId());
YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: "
Expand Down Expand Up @@ -1819,6 +1829,59 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
return;
}

if (auto maybeDqSourceStreamLookup = connection.Maybe<TDqCnStreamLookup>()) {
const auto streamLookup = maybeDqSourceStreamLookup.Cast();
const auto lookupSourceWrap = streamLookup.RightInput().Cast<TDqLookupSourceWrap>();

const TStringBuf dataSourceCategory = lookupSourceWrap.DataSource().Category();
const auto provider = TypesCtx.DataSourceMap.find(dataSourceCategory);
YQL_ENSURE(provider != TypesCtx.DataSourceMap.end(), "Unsupported data source category: \"" << dataSourceCategory << "\"");
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");

auto& dqSourceLookupCn = *connectionProto.MutableDqSourceStreamLookup();
auto& lookupSource = *dqSourceLookupCn.MutableLookupSource();
auto& lookupSourceSettings = *lookupSource.MutableSettings();
auto& lookupSourceType = *lookupSource.MutableType();
dqIntegration->FillLookupSourceSettings(lookupSourceWrap.Ref(), lookupSourceSettings, lookupSourceType);
YQL_ENSURE(!lookupSourceSettings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(lookupSourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");

const auto& streamLookupOutput = streamLookup.Output();
const auto connectionInputRowType = GetSeqItemType(streamLookupOutput.Ref().GetTypeAnn());
YQL_ENSURE(connectionInputRowType->GetKind() == ETypeAnnotationKind::Struct);
const auto connectionOutputRowType = GetSeqItemType(streamLookup.Ref().GetTypeAnn());
YQL_ENSURE(connectionOutputRowType->GetKind() == ETypeAnnotationKind::Struct);
YQL_ENSURE(stage);
dqSourceLookupCn.SetConnectionInputRowType(NYql::NCommon::GetSerializedTypeAnnotation(connectionInputRowType));
dqSourceLookupCn.SetConnectionOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(connectionOutputRowType));
dqSourceLookupCn.SetLookupRowType(NYql::NCommon::GetSerializedTypeAnnotation(lookupSourceWrap.RowType().Ref().GetTypeAnn()));
dqSourceLookupCn.SetInputStageRowType(NYql::NCommon::GetSerializedTypeAnnotation(GetSeqItemType(streamLookupOutput.Stage().Program().Ref().GetTypeAnn())));
dqSourceLookupCn.SetOutputStageRowType(NYql::NCommon::GetSerializedTypeAnnotation(GetSeqItemType(stage->Program().Args().Arg(inputIndex).Ref().GetTypeAnn())));

const TString leftLabel(streamLookup.LeftLabel());
dqSourceLookupCn.SetLeftLabel(leftLabel);
dqSourceLookupCn.SetRightLabel(streamLookup.RightLabel().StringValue());
dqSourceLookupCn.SetJoinType(streamLookup.JoinType().StringValue());
dqSourceLookupCn.SetCacheLimit(FromString<ui64>(streamLookup.MaxCachedRows()));
dqSourceLookupCn.SetCacheTtlSeconds(FromString<ui64>(streamLookup.TTL()));
dqSourceLookupCn.SetMaxDelayedRows(FromString<ui64>(streamLookup.MaxDelayedRows()));

if (const auto maybeMultiget = streamLookup.IsMultiget()) {
dqSourceLookupCn.SetIsMultiGet(FromString<bool>(maybeMultiget.Cast()));
}

for (const auto& key : streamLookup.LeftJoinKeyNames()) {
*dqSourceLookupCn.AddLeftJoinKeyNames() = leftLabel ? RemoveJoinAliases(key) : key;
}

for (const auto& key : streamLookup.RightJoinKeyNames()) {
*dqSourceLookupCn.AddRightJoinKeyNames() = RemoveJoinAliases(key);
}

return;
}

YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName());
}

Expand Down
Loading
Loading