Skip to content

Commit bc4b96d

Browse files
authored
Merge 464b6ca into 580503a
2 parents 580503a + 464b6ca commit bc4b96d

File tree

26 files changed

+1005
-283
lines changed

26 files changed

+1005
-283
lines changed

ydb/core/kqp/common/kqp_tx.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
392392
case NKqpProto::TKqpPhyConnection::kResult:
393393
case NKqpProto::TKqpPhyConnection::kValue:
394394
case NKqpProto::TKqpPhyConnection::kMerge:
395+
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
395396
case NKqpProto::TKqpPhyConnection::TYPE_NOT_SET:
396397
break;
397398
}

ydb/core/kqp/common/kqp_user_request_context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace NKikimr::NKqp {
1919
NActors::TActorId RunScriptActorId;
2020
TString PoolId;
2121
std::optional<NResourcePool::TPoolSettings> PoolConfig;
22+
bool IsStreamingQuery = false;
2223

2324
TUserRequestContext() = default;
2425

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
9696
} else {
9797
Config->_ResultRowsLimit.Clear();
9898
}
99+
100+
if (UserRequestContext && UserRequestContext->IsStreamingQuery) {
101+
Config->HashJoinMode = NYql::NDq::EHashJoinMode::Map;
102+
}
99103
}
100104
PerStatementResult = perStatementResult && Config->EnablePerStatementQueryExecution;
101105

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@
66
#include <ydb/core/base/appdata.h>
77
#include <ydb/core/kqp/runtime/kqp_compute.h>
88
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
9-
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
109
#include <ydb/core/kqp/runtime/kqp_read_table.h>
1110
#include <ydb/core/kqp/runtime/kqp_sequencer_factory.h>
1211
#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
1312
#include <ydb/core/kqp/runtime/kqp_vector_actor.h>
14-
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
13+
#include <ydb/core/kqp/runtime/kqp_write_actor.h>
1514
#include <ydb/library/formats/arrow/protos/ssa.pb.h>
15+
#include <ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup_factory.h>
16+
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
17+
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
1618
#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
17-
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
19+
#include <ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.h>
1820
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
1921
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
20-
#include <ydb/library/yql/dq/comp_nodes/dq_block_hash_join.h>
21-
#include <ydb/library/yql/dq/comp_nodes/dq_hash_combine.h>
22+
#include <ydb/library/yql/providers/solomon/actors/dq_solomon_read_actor.h>
2223

2324
namespace NKikimr {
2425
namespace NMiniKQL {
@@ -90,6 +91,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
9091
RegisterKqpWriteActor(*factory, counters);
9192
RegisterSequencerActorFactory(*factory, counters);
9293
RegisterKqpVectorResolveActor(*factory, counters);
94+
NYql::NDq::RegisterDqInputTransformLookupActorFactory(*factory);
9395

9496
if (federatedQuerySetup) {
9597
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});

ydb/core/kqp/compute_actor/ya.make

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ PEERDIR(
2525
ydb/library/formats/arrow/protos
2626
ydb/library/formats/arrow/common
2727
ydb/library/yql/dq/actors/compute
28+
ydb/library/yql/dq/actors/input_transforms
29+
ydb/library/yql/dq/comp_nodes
2830
ydb/library/yql/providers/generic/actors
2931
ydb/library/yql/providers/pq/async_io
3032
ydb/library/yql/providers/s3/actors_factory
3133
ydb/library/yql/providers/solomon/actors
3234
yql/essentials/public/issue
33-
ydb/library/yql/dq/comp_nodes
3435
)
3536

3637
GENERATE_ENUM_SERIALIZATION(kqp_compute_state.h)

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,47 @@ void TKqpTasksGraph::BuildVectorResolveChannels(const TStageInfo& stageInfo, ui3
517517
inputStageInfo, outputIndex, enableSpilling, logFunc);
518518
}
519519

520+
void TKqpTasksGraph::BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
521+
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const TChannelLogFunc& logFunc) {
522+
YQL_ENSURE(stageInfo.Tasks.size() == 1);
523+
524+
auto* settings = GetMeta().Allocate<NDqProto::TDqInputTransformLookupSettings>();
525+
settings->SetLeftLabel(dqSourceStreamLookup.GetLeftLabel());
526+
settings->SetRightLabel(dqSourceStreamLookup.GetRightLabel());
527+
settings->SetJoinType(dqSourceStreamLookup.GetJoinType());
528+
settings->SetNarrowInputRowType(dqSourceStreamLookup.GetConnectionInputRowType());
529+
settings->SetNarrowOutputRowType(dqSourceStreamLookup.GetConnectionOutputRowType());
530+
settings->SetCacheLimit(dqSourceStreamLookup.GetCacheLimit());
531+
settings->SetCacheTtlSeconds(dqSourceStreamLookup.GetCacheTtlSeconds());
532+
settings->SetMaxDelayedRows(dqSourceStreamLookup.GetMaxDelayedRows());
533+
settings->SetIsMultiget(dqSourceStreamLookup.GetIsMultiGet());
534+
535+
const auto& leftJointKeys = dqSourceStreamLookup.GetLeftJoinKeyNames();
536+
settings->MutableLeftJoinKeyNames()->Assign(leftJointKeys.begin(), leftJointKeys.end());
537+
538+
const auto& rightJointKeys = dqSourceStreamLookup.GetRightJoinKeyNames();
539+
settings->MutableRightJoinKeyNames()->Assign(rightJointKeys.begin(), rightJointKeys.end());
540+
541+
auto& streamLookupSource = *settings->MutableRightSource();
542+
streamLookupSource.SetSerializedRowType(dqSourceStreamLookup.GetLookupRowType());
543+
const auto& compiledSource = dqSourceStreamLookup.GetLookupSource();
544+
streamLookupSource.SetProviderName(compiledSource.GetType());
545+
*streamLookupSource.MutableLookupSource() = compiledSource.GetSettings();
546+
547+
TTransform dqSourceStreamLookupTransform = {
548+
.Type = "StreamLookupInputTransform",
549+
.InputType = dqSourceStreamLookup.GetInputStageRowType(),
550+
.OutputType = dqSourceStreamLookup.GetOutputStageRowType(),
551+
};
552+
YQL_ENSURE(dqSourceStreamLookupTransform.Settings.PackFrom(*settings));
553+
554+
for (const auto taskId : stageInfo.Tasks) {
555+
GetTask(taskId).Inputs[inputIndex].Transform = dqSourceStreamLookupTransform;
556+
}
557+
558+
BuildUnionAllChannels(*this, stageInfo, inputIndex, inputStageInfo, outputIndex, /* enableSpilling */ false, logFunc);
559+
}
560+
520561
void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, bool enableSpilling, bool enableShuffleElimination) {
521562
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
522563

@@ -709,6 +750,12 @@ void TKqpTasksGraph::BuildKqpStageChannels(TStageInfo& stageInfo, ui64 txId, boo
709750
break;
710751
}
711752

753+
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup: {
754+
BuildDqSourceStreamLookupChannels(stageInfo, inputIdx, inputStageInfo, outputIdx,
755+
input.GetDqSourceStreamLookup(), log);
756+
break;
757+
}
758+
712759
default:
713760
YQL_ENSURE(false, "Unexpected stage input type: " << (ui32)input.GetTypeCase());
714761
}
@@ -1369,6 +1416,8 @@ void TKqpTasksGraph::FillInputDesc(NYql::NDqProto::TTaskInput& inputDesc, const
13691416
}
13701417

13711418
transformProto->MutableSettings()->PackFrom(*input.Meta.VectorResolveSettings);
1419+
} else {
1420+
*transformProto->MutableSettings() = input.Transform->Settings;
13721421
}
13731422
}
13741423
}
@@ -1724,6 +1773,7 @@ bool TKqpTasksGraph::BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCo
17241773
case NKqpProto::TKqpPhyConnection::kMap:
17251774
case NKqpProto::TKqpPhyConnection::kParallelUnionAll:
17261775
case NKqpProto::TKqpPhyConnection::kVectorResolve:
1776+
case NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup:
17271777
break;
17281778
default:
17291779
YQL_ENSURE(false, "Unexpected connection type: " << (ui32)input.GetTypeCase() << Endl);

ydb/core/kqp/executer_actor/kqp_tasks_graph.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,8 @@ class TKqpTasksGraph : public NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMet
422422
void BuildVectorResolveChannels(const TStageInfo& stageInfo, ui32 inputIndex,
423423
const TStageInfo& inputStageInfo, ui32 outputIndex,
424424
const NKqpProto::TKqpPhyCnVectorResolve& vectorResolve, bool enableSpilling, const NYql::NDq::TChannelLogFunc& logFunc);
425+
void BuildDqSourceStreamLookupChannels(const TStageInfo& stageInfo, ui32 inputIndex, const TStageInfo& inputStageInfo,
426+
ui32 outputIndex, const NKqpProto::TKqpPhyCnDqSourceStreamLookup& dqSourceStreamLookup, const NYql::NDq::TChannelLogFunc& logFunc);
425427

426428
void FillOutputDesc(NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, ui32 outputIdx,
427429
bool enableSpilling, const TStageInfo& stageInfo) const;

ydb/core/kqp/gateway/behaviour/streaming_query/optimization.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ bool ExploreStreamingQueryNode(TExprNode::TPtr node, TStreamingExploreCtx& res)
3939
const auto providerArg = node->ChildPtr(1);
4040
if (const auto maybeDataSource = TMaybeNode<TCoDataSource>(providerArg)) {
4141
const auto dataSourceCategory = maybeDataSource.Cast().Category().Value();
42-
if (dataSourceCategory == NYql::PqProviderName) {
43-
++res.StreamingReads;
42+
if (IsIn({NYql::PqProviderName, NYql::S3ProviderName, NYql::GenericProviderName}, dataSourceCategory)) {
43+
res.StreamingReads += dataSourceCategory == NYql::PqProviderName;
4444
return true;
4545
}
4646

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,12 +1773,18 @@ class TKqpHost : public IKqpHost {
17731773
}
17741774

17751775
auto state = MakeIntrusive<NYql::TS3State>();
1776+
1777+
auto& configuration = *state->Configuration;
1778+
if (const auto requestContext = SessionCtx->GetUserRequestContext(); requestContext && requestContext->IsStreamingQuery) {
1779+
configuration.DisablePragma(configuration.UseRuntimeListing, false, "Runtime listing is not supported for streaming queries, pragma value was ignored");
1780+
}
1781+
configuration.WriteThroughDqIntegration = true;
1782+
configuration.AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
1783+
configuration.Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
1784+
17761785
state->Types = TypesCtx.Get();
17771786
state->FunctionRegistry = FuncRegistry;
17781787
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
1779-
state->Configuration->WriteThroughDqIntegration = true;
1780-
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
1781-
state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
17821788
state->Gateway = FederatedQuerySetup->HttpGateway;
17831789
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
17841790
state->ExecutorPoolId = AppData()->UserPoolId;
@@ -1980,6 +1986,7 @@ class TKqpHost : public IKqpHost {
19801986
auto configProvider = CreateConfigProvider(*TypesCtx, gatewaysConfig, {}, allowSettings);
19811987
TypesCtx->AddDataSource(ConfigProviderName, configProvider);
19821988
TypesCtx->MatchRecognize = QueryServiceConfig.GetEnableMatchRecognize();
1989+
TypesCtx->StreamLookupJoin = true;
19831990

19841991
YQL_ENSURE(TypesCtx->Initialize(*ExprCtx));
19851992

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
4141
AddHandler(0, &TCoTake::Match, HNDL(RewriteTakeSortToTopSort));
4242
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInToEquiJoin));
4343
AddHandler(0, &TCoFlatMap::Match, HNDL(RewriteSqlInCompactToJoin));
44+
AddHandler(0, &TCoEquiJoin::Match, HNDL(RewriteStreamEquiJoinWithLookup));
4445
AddHandler(0, &TCoEquiJoin::Match, HNDL(OptimizeEquiJoinWithCosts));
4546
AddHandler(0, &TCoEquiJoin::Match, HNDL(RewriteEquiJoin));
4647
AddHandler(0, &TDqJoin::Match, HNDL(JoinToIndexLookup));
@@ -167,6 +168,12 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
167168
return output;
168169
}
169170

171+
TMaybeNode<TExprBase> RewriteStreamEquiJoinWithLookup(TExprBase node, TExprContext& ctx) {
172+
TExprBase output = DqRewriteStreamEquiJoinWithLookup(node, ctx, TypesCtx);
173+
DumpAppliedRule("KqpRewriteStreamEquiJoinWithLookup", node.Ptr(), output.Ptr(), ctx);
174+
return output;
175+
}
176+
170177
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
171178
auto maxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize);
172179
auto optLevel = Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel);

0 commit comments

Comments
 (0)