Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join two nodes when one has no data #1871

Open
ASKozienko opened this issue Mar 21, 2018 · 8 comments
Open

Join two nodes when one has no data #1871

ASKozienko opened this issue Mar 21, 2018 · 8 comments
Milestone

Comments

@ASKozienko
Copy link

For example I have 2 metrics (temp1, temp2). But only one has incoming data. Maybe temp2 sensor is turned off, does not matter. For my condition "t1.value" > 0 OR "t2.value" > 0 temp2 is not relevant. But I didn't see in the log any point with prefix P1 after join, join does not happen. So the question is how to make full outer join in my case?

PS. if temp2 has incoming data it works fine

var temp1 = batch
    |query('SELECT mean(value) AS value FROM "my_db"."autogen".temp1')
        .period(10s)
        .every(5s)
        .groupBy(time(2s), *)
        .fill(0)
    |log()
        .prefix('P0-1')
        .level('DEBUG')

var temp2 = batch
    |query('SELECT mean(value) AS value FROM "my_db"."autogen".temp2')
        .period(10s)
        .every(5s)
        .groupBy(time(2s), *)
        .fill(0)
    |log()
        .prefix('P0-2')
        .level('DEBUG')

temp1
    |join(temp2)
        .tolerance(5s)
        .fill('null')
        .as('t1', 't2')
    |default()
        .field('t1.value', -1)
        .field('t2.value', -1)
    |log()
        .prefix('P1')
        .level('DEBUG')
     |where(lambda: "t1.value" > 0 OR "t2.value" > 0)
     |alert()
@conet
Copy link

conet commented May 26, 2018

I'm seeing the same behavior that fill('null') or fill(value) does not work on the JoinNode. I'm using kapacitor 1.4.1.

@conet
Copy link

conet commented May 26, 2018

Looking at the join code I have a hunch that I'm hitting this condition.

For example one has measurement A and B and measurement A has series A,x A,y and A,z and B has series B,x and B,y outer join will only join A,B,x and A,B,y because of that condition. If B would have B,z but some points were missing then the outer join would work as defined.

Could this be fixed or at least the documentation be updated to avoid confusion?

@sdicataldo
Copy link

Hey guys,
not clear if this issue has been sorted.
I have the scenario when a batch query return no data.
how I can manage this scenario?
join or union doesn't work

@timhallinflux timhallinflux added this to the 1.5.4 milestone Aug 22, 2019
@oraclecaicai
Copy link

Kapacitor: 1.5.3
InfluxDB: 1.7.8

Hi guys,
I have a TICKscript like this:

var req_all = stream
|from()
.measurement('request_log')
|window()
.period(1m)
.every(1m)
|count('step')
.as('cnt')

var req_err = stream
|from()
.measurement('request_log')
.where(lambda: "status" =~ /^5\d\d$/)
|window()
.period(1m)
.every(1m)
|count('step')
.as('cnt')

req_err
|join(req_all)
.as('err', 'all')
.fill(0)
|eval(lambda: "all.cnt", lambda: (float("all.cnt") - float("err.cnt")) / float("all.cnt") * 100.0)
.as('qpm', 'succ_rate')
|influxDBOut()
.database('data')
.measurement('request_stats')

It doesn't work properly. The symptom is like that mentioned above. I cannot see the result data in my InfluxDB when the task is running. However, the result data will be written into InfluxDB when I disable the task.

from4 [avg_exec_time_ns="18.189µs" errors="0" working_cardinality="0" ];
from4 -> window5 [processed="0"];

window5 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
window5 -> count6 [processed="0"];

count6 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
count6 -> join8 [processed="0"];

from1 [avg_exec_time_ns="22.796µs" errors="0" working_cardinality="0" ];
from1 -> window2 [processed="713133"];

window2 [avg_exec_time_ns="28.023µs" errors="0" working_cardinality="1" ];
window2 -> count3 [processed="2825"];

count3 [avg_exec_time_ns="6.653µs" errors="0" working_cardinality="1" ];
count3 -> join8 [processed="2825"];

join8 [avg_exec_time_ns="140.31µs" errors="0" working_cardinality="1" ];
join8 -> eval9 [processed="0"];

eval9 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
eval9 -> influxdb_out10 [processed="0"];

influxdb_out10 [avg_exec_time_ns="0s" errors="0" points_written="0" working_cardinality="0" write_errors="0" ];

@dima100
Copy link

dima100 commented Sep 19, 2019

The workaround for me was to union two streams first. And then to join the completed stream with the United stream. Pretty ugly but it do the job if you have one full and one partial stream and want to outer join them

@oraclecaicai
Copy link

oraclecaicai commented Oct 4, 2019

The workaround for me was to union two streams first. And then to join the completed stream with the United stream. Pretty ugly but it do the job if you have one full and one partial stream and want to outer join them

Thank you, man. Now I'm using a method similar to yours.
It seems that each stream in a join must has data coming in in every window, or else the join will not output any data. In my scenario, I use the tail plugin of Fluent Bit to fetch some data from our application logs. To make at least one error occur in every window, I use another plugin named exec to echo a dummy entry every minute and add a type field to distinguish them from the genuine entries.
Then in Kapacitor, I divide the original stream into three, all entries with dummy ones (for count only), error entries with dummy ones (for count only) and all entries without dummy ones (for count and other aggregations), by WhereNode. Each stream has at least one point in every minute. After some other nodes like GroupByNode etc, multiple streams generated by the the three streams will be joined. Then I use an EvalNode and in one of its lambda expression I substract the ERROR count from the ALL count. Because the two counts both contain the number of dummy entries, the result will filter them out, then it will be divided by the count of the third stream. You may have guessed that I want to work out the success rate.
PS: I don't use the where property method of the FromNode, because it comes before the WindowNode. There seems to be a deviation in the window range if streams each with an individual WindowNode are joined together even if the parameters of their every and period methods are totally the same and the align methods are also used.

@prmkbr
Copy link

prmkbr commented May 14, 2020

The workaround for me was to union two streams first. And then to join the completed stream with the United stream. Pretty ugly but it do the job if you have one full and one partial stream and want to outer join them

Could you share a sample on how you got this working?

I tried this approach, but not seeing the results expected for outer join -- it's still giving the results of an inner join.

var s1 = batch | query(..).period(1h).every(10s).groupBy('f1', 'f2')
var s2 = batch | query(..).period(1h).every(10s).groupBy('f1', 'f2')

var u1 = s1 | union(s2)

s1 | join(u1) | log()

Another problem I observed was with the results of union itself --

Sometimes it's getting split across different run of the batch:

15:10:12 -> s1r1, s1r2
15:10:22 -> s2r1, s1r1, s1r2, s2r1

where s1r1 & s1r2 are results of s1 and s2r1 is that of s2

@rluetzner
Copy link

This issue has been open for quite some time and many people (including me) ran into this problem. I think at least the documentation for JoinNode should include a big warning box that the join will not happen when one of the streams has no data points incoming.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants