@@ -406,6 +406,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
406406 }
407407
408408 ProcessOutputsState.LastRunStatus = status;
409+ ProcessOutputsState.LastRunTime = TInstant::Now ();
409410
410411 for (auto & entry : OutputChannelsMap) {
411412 const ui64 channelId = entry.first ;
@@ -855,25 +856,29 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
855856
856857protected:
857858 struct TInputChannelInfo {
859+ ui32 InputIndex;
858860 TString LogPrefix;
859861 ui64 ChannelId;
860862 ui32 SrcStageId;
861863 IDqInputChannel::TPtr Channel;
862864 bool HasPeer = false ;
865+ NActors::TActorId PeerId;
863866 const NDqProto::EWatermarksMode WatermarksMode;
864867 const TDuration WatermarksIdleTimeout = TDuration::Max();
865868 std::optional<NDqProto::TCheckpoint> PendingCheckpoint;
866869 const NDqProto::ECheckpointingMode CheckpointingMode;
867870 i64 FreeSpace = 0 ;
868871
869872 explicit TInputChannelInfo (
873+ ui32 inputIndex,
870874 const TString& logPrefix,
871875 ui64 channelId,
872876 ui32 srcStageId,
873877 NDqProto::EWatermarksMode watermarksMode,
874878 NDqProto::ECheckpointingMode checkpointingMode,
875879 TDuration watermarksIdleTimeout)
876- : LogPrefix(logPrefix)
880+ : InputIndex(inputIndex)
881+ , LogPrefix(logPrefix)
877882 , ChannelId(channelId)
878883 , SrcStageId(srcStageId)
879884 , WatermarksMode(watermarksMode)
@@ -919,6 +924,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
919924 ui32 DstStageId;
920925 IDqOutputChannel::TPtr Channel;
921926 bool HasPeer = false ;
927+ NActors::TActorId PeerId;
922928 bool Finished = false ; // != Channel->IsFinished() // If channel is in finished state, it sends only checkpoints.
923929 bool EarlyFinish = false ;
924930 bool PopStarted = false ;
@@ -1089,6 +1095,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10891095
10901096 Channels->SetInputChannelPeer (channelUpdate.GetId (), peer);
10911097 inputChannel->HasPeer = true ;
1098+ inputChannel->PeerId = peer;
10921099
10931100 continue ;
10941101 }
@@ -1101,6 +1108,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
11011108
11021109 Channels->SetOutputChannelPeer (channelUpdate.GetId (), peer);
11031110 outputChannel->HasPeer = true ;
1111+ outputChannel->PeerId = peer;
11041112 if (outputChannel->Channel ) {
11051113 outputChannel->Channel ->UpdateSettings ({.IsLocalChannel = peer.NodeId () == this ->SelfId ().NodeId ()});
11061114 }
@@ -1276,8 +1284,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
12761284 void MonitoringExtra (TStringStream&) {
12771285 }
12781286
1279- void OnMonitoringPage (NActors::NMon::TEvHttpInfo::TPtr& ev) {
1280- TStringStream html;
1287+ void DumpForMonitoring (TStringStream& html) {
12811288 static_cast <TDerived*>(this )->MonitoringExtra (html);
12821289
12831290#define DUMP (P, X,...) html << #X " : " << P.X __VA_ARGS__ << " <br />"
@@ -1533,8 +1540,181 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
15331540 }
15341541#undef DUMP
15351542#undef DUMP_PREFIXED
1543+ }
1544+
1545+ void DefaultMonitoringPage (TStringStream& str) {
1546+ HTML (str) {
1547+ PRE () {
1548+ str << " TDqComputeActorBase, SelfId=" << this ->SelfId () << ' ' ;
1549+ HREF (TStringBuilder () << " ?ca=" << this ->SelfId () << " &view=dump" ) {
1550+ str << " Dump" ;
1551+ }
1552+ str << ' ' ;
1553+ HREF (TStringBuilder () << " ?ca=" << this ->SelfId () << " &view=run" ) {
1554+ str << " Run" ;
1555+ }
1556+ str << Endl;
1557+
1558+ COLLAPSED_BUTTON_CONTENT (" ProcessOutputsState" , TStringBuilder () << " ProcessOutputsState: " << ProcessOutputsState.LastRunTime << ' ' << ProcessOutputsState.LastRunStatus ) {
1559+ str << " Inflight: " << ProcessOutputsState.Inflight << Endl;
1560+ str << " ChannelsReady: " << ProcessOutputsState.ChannelsReady << Endl;
1561+ str << " HasDataToSend: " << ProcessOutputsState.HasDataToSend << Endl;
1562+ str << " DataWasSent: " << ProcessOutputsState.DataWasSent << Endl;
1563+ str << " AllOutputsFinished: " << ProcessOutputsState.AllOutputsFinished << Endl;
1564+ str << " LastRunStatus: " << ProcessOutputsState.LastRunStatus << Endl;
1565+ str << " LastRunTime: " << ProcessOutputsState.LastRunTime << Endl;
1566+ str << " LastPopReturnedNoData: " << ProcessOutputsState.LastPopReturnedNoData << Endl;
1567+ }
1568+
1569+ str << Endl << " Input Channels:" << Endl;
1570+ TABLE_SORTABLE_CLASS (" table table-condensed" ) {
1571+ TABLEHEAD () {
1572+ TABLER () {
1573+ TABLEH_ATTRS ({{" title" , " ChannelId" }}) {str << " Id" ;}
1574+ TABLEH_ATTRS ({{" title" , " SrcStageId" }}) {str << " Src" ;}
1575+ TABLEH_ATTRS ({{" title" , " InputIndex" }}) {str << " Idx" ;}
1576+ TABLEH () {str << " PeerId" ;}
1577+ TABLEH_ATTRS ({{" title" , " IsFinished" }}) {str << " F" ;}
1578+ TABLEH () {str << " Push.Bytes" ;}
1579+ TABLEH () {str << " Push.Rows" ;}
1580+ TABLEH () {str << " Pop.Bytes" ;}
1581+ TABLEH () {str << " Pop.Rows" ;}
1582+ }
1583+ }
1584+ TABLEBODY () {
1585+ for (const auto & [id, info]: InputChannelsMap) {
1586+ TABLER () {
1587+ TABLED () {str << info.ChannelId ;}
1588+ TABLED () {str << info.SrcStageId ;}
1589+ TABLED () {str << info.InputIndex ;}
1590+ TABLED () {
1591+ if (info.HasPeer ) {
1592+ HREF (TStringBuilder () << " /node/" << info.PeerId .NodeId () << " /actors/kqp_node?ca=" << info.PeerId ) {
1593+ str << info.PeerId ;
1594+ }
1595+ } else {
1596+ str << " N/A" ;
1597+ }
1598+ }
15361599
1537- this ->Send (ev->Sender , new NActors::NMon::TEvHttpInfoRes (html.Str ()));
1600+ auto channel = info.Channel ;
1601+ if (!channel) {
1602+ auto stats = GetTaskRunnerStats ();
1603+ if (stats) {
1604+ auto stageIt = stats->InputChannels .find (info.SrcStageId );
1605+ if (stageIt != stats->InputChannels .end ()) {
1606+ auto channelIt = stageIt->second .find (info.ChannelId );
1607+ if (channelIt != stageIt->second .end ()) {
1608+ channel = channelIt->second ;
1609+ }
1610+ }
1611+ }
1612+ }
1613+
1614+ if (channel) {
1615+ TABLED () {str << channel->IsFinished ();}
1616+ auto & pushStats = channel->GetPushStats ();
1617+ TABLED () {str << pushStats.Bytes ;}
1618+ TABLED () {str << pushStats.Rows ;}
1619+ auto & popStats = channel->GetPopStats ();
1620+ TABLED () {str << popStats.Bytes ;}
1621+ TABLED () {str << popStats.Rows ;}
1622+ } else {
1623+ TABLED () {str << " N/A" ;}
1624+ TABLED () {str << " N/A" ;}
1625+ TABLED () {str << " N/A" ;}
1626+ TABLED () {str << " N/A" ;}
1627+ TABLED () {str << " N/A" ;}
1628+ }
1629+ }
1630+ }
1631+ }
1632+ }
1633+
1634+ str << Endl << " Output Channels:" << Endl;
1635+ TABLE_SORTABLE_CLASS (" table table-condensed" ) {
1636+ TABLEHEAD () {
1637+ TABLER () {
1638+ TABLEH_ATTRS ({{" title" , " ChannelId" }}) {str << " Id" ;}
1639+ TABLEH_ATTRS ({{" title" , " DstStageId" }}) {str << " Dst" ;}
1640+ TABLEH () {str << " PeerId" ;}
1641+ TABLEH_ATTRS ({{" title" , " Finished" }}) {str << " F" ;}
1642+ TABLEH_ATTRS ({{" title" , " EarlyFinish" }}) {str << " EF" ;}
1643+ TABLEH () {str << " Push.Bytes" ;}
1644+ TABLEH () {str << " Push.Rows" ;}
1645+ TABLEH () {str << " Pop.Bytes" ;}
1646+ TABLEH () {str << " Pop.Rows" ;}
1647+ }
1648+ }
1649+ TABLEBODY () {
1650+ for (const auto & [id, info]: OutputChannelsMap) {
1651+ TABLER () {
1652+ TABLED () {str << info.ChannelId ;}
1653+ TABLED () {str << info.DstStageId ;}
1654+ TABLED () {
1655+ if (info.HasPeer ) {
1656+ HREF (TStringBuilder () << " /node/" << info.PeerId .NodeId () << " /actors/kqp_node?ca=" << info.PeerId ) {
1657+ str << info.PeerId ;
1658+ }
1659+ } else {
1660+ str << " N/A" ;
1661+ }
1662+ }
1663+ TABLED () {str << info.Finished ;}
1664+ TABLED () {str << info.EarlyFinish ;}
1665+
1666+ auto channel = info.Channel ;
1667+ if (!channel) {
1668+ auto stats = GetTaskRunnerStats ();
1669+ if (stats) {
1670+ auto stageIt = stats->OutputChannels .find (info.DstStageId );
1671+ if (stageIt != stats->OutputChannels .end ()) {
1672+ auto channelIt = stageIt->second .find (info.ChannelId );
1673+ if (channelIt != stageIt->second .end ()) {
1674+ channel = channelIt->second ;
1675+ }
1676+ }
1677+ }
1678+ }
1679+
1680+ if (channel) {
1681+ auto & pushStats = channel->GetPushStats ();
1682+ TABLED () {str << pushStats.Bytes ;}
1683+ TABLED () {str << pushStats.Rows ;}
1684+ auto & popStats = channel->GetPopStats ();
1685+ TABLED () {str << popStats.Bytes ;}
1686+ TABLED () {str << popStats.Rows ;}
1687+ } else {
1688+ TABLED () {str << " N/A" ;}
1689+ TABLED () {str << " N/A" ;}
1690+ TABLED () {str << " N/A" ;}
1691+ TABLED () {str << " N/A" ;}
1692+ }
1693+ }
1694+ }
1695+ }
1696+ }
1697+ }
1698+ }
1699+ }
1700+
1701+ void OnMonitoringPage (NActors::NMon::TEvHttpInfo::TPtr& ev) {
1702+ TStringStream str;
1703+
1704+ const TCgiParameters &cgi = ev->Get ()->Request .GetParams ();
1705+ auto view = cgi.Get (" view" );
1706+ if (view == " dump" ) {
1707+ DumpForMonitoring (str);
1708+ } else if (view == " run" ) {
1709+ if (this ->Running ) {
1710+ this ->DoExecute ();
1711+ }
1712+ DefaultMonitoringPage (str);
1713+ } else {
1714+ DefaultMonitoringPage (str);
1715+ }
1716+
1717+ this ->Send (ev->Sender , new NActors::NMon::TEvHttpInfoRes (str.Str ()));
15381718 }
15391719
15401720protected:
@@ -1930,6 +2110,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
19302110 auto result = InputChannelsMap.emplace (
19312111 channel.GetId (),
19322112 TInputChannelInfo (
2113+ i,
19332114 LogPrefix,
19342115 channel.GetId (),
19352116 channel.GetSrcStageId (),
@@ -1967,7 +2148,10 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
19672148 for (auto & channel : outputDesc.GetChannels ()) {
19682149 TOutputChannelInfo outputChannel (channel.GetId (), channel.GetDstStageId ());
19692150 outputChannel.HasPeer = channel.GetDstEndpoint ().HasActorId ();
1970- outputChannel.IsTransformOutput = outputDesc.HasTransform ();
2151+ if (outputChannel.HasPeer ) {
2152+ outputChannel.PeerId = NActors::ActorIdFromProto (channel.GetDstEndpoint ().GetActorId ());
2153+ }
2154+ outputChannel.IsTransformOutput = outputDesc.HasTransform ();
19712155 outputChannel.WatermarksMode = channel.GetWatermarksMode ();
19722156
19732157 if (Y_UNLIKELY (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_PROFILE)) {
@@ -2366,6 +2550,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
23662550 bool DataWasSent = false ;
23672551 bool AllOutputsFinished = true ;
23682552 ERunStatus LastRunStatus = ERunStatus::PendingInput;
2553+ TInstant LastRunTime;
23692554 bool LastPopReturnedNoData = false ;
23702555 };
23712556 TProcessOutputsState ProcessOutputsState;
0 commit comments