-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Realtime Join #64
Comments
Greetings @cybertyche, any chance you will be able to provide feedback soon? Thanks in advance, @afriedma |
My sincere apologies for the delay - I had the unfortunate luck of catching the flu this week. The behavior you describe is currently by design, but that doesn't mean it can't change. The situation is this: there are a number of scenarios where the join operator cannot reasonably make its output until after time moves forward. These scenarios usually involve the possibility of an end edge arriving and thus "deactivating" a join result. That said, it seems as if we could be doing better with this and more proactive about pushing data to the results from the join synopsis. So, if this is more a question of "this is odd behavior, what's up", then that's what's up. :-) If this is more of a question of "I really need lower latency and this issue is blocking me" then I will raise the priority of this issue. |
Thank you @cybertyche for reply, hope you feeling better, as warm weather seems to be almost upon us :) I am glad, you confirmed this behavior, I am getting around this issue at the moment by merging a punctuation stream with interval of x, which forces Trill to generate output. Want to switch gears a bit, if you don't mind. I have another use case where I need to implement a sliding window, from what I gathered in the examples, you do it by AlterEventDuration((start, end) => end - start + TimeSpan.FromSeconds(windowSize).Ticks) My question is, once again I have a real-time scenario, and I create events with StreamEvent.CreateStart The output I am getting, is that events never leave the window, am I doing something incorrectly or this is intended behavior? Thanks again, @afriedma |
I would suggest instead using the built-in method ExtendLifetime(long). Let me know if you still get the same response. |
Just to follow up on my previous comment, I am trying to count children withing the sliding window of 10 sec with punctuation inserted every 5 seconds. My child object is keyed on id and parent id, I have GetHashCode() implemented as well. If id and parentid are the same, it should not be counted twice. Here is the stream construct
My test code is the following
I am expecting a single record with ReportedValue of 1, but instead I get following output on StreamEventKind.Start ParentOrderId[2] - Market[188] - ReportedValue[1] - ReportTms[4/4/2019 6:16:53 PM] Your input would be greatly appreciated. |
Sorry, I missed your response, I did try the function you recommended, but getting the same unexpected result from my previous comment, code below `var slidingWindow = ingressStream
|
Just so I have some clarity, what is the reason for adding the ClipEventDuration here? It seems to undermine the sliding window so that there are not going to be overlapping lifespans. |
I am trying to use ClipEventDuration to truncate the last event with the same key and replace it with the latest one. Child A arrives with Quantity 1M, then quantity is updated to 2M, I am sending latest Child A into the Trill and want Trill to consider them as same object withing the sliding window. Hope this helps. |
(one side note while I think about the issue - if you reuse a stream variable, such as in stream.ClipEventDuration(stream, ...), you will want to wrap that in a .Multicast, such as stream.Multicast(s => s.ClipEventDuration(s, ...)); doing so will prevent multiple subscription chains down to your ingress source) |
I think I see the problem, maybe. Because your ClipEventDuration is using the full object identity on both sides (x => x) as the key, then quantity is being taken into consideration as part of that equality test. Thus you will not be treating child A with quantity 1M as the same as child A with quantity 2M. To do that, you'll want to make your keys only those fields that you want considered for equality, maybe something like (x => new { x.id, x.parentid }). |
My model object does implement IEquatable interfaces and I did implement GetHashCode and Equals using the key you suggesting. Are you saying Trill not using my objects equality by default? |
If your class implements IEquatable, we should indeed be using it. I just double-checked the clip operation, it does use it. It's a little strange to have IEquatable simply ignore fields with relevant data... I'm curious if you simply omit the quantity field altogether, or if you change the quantity fields to all have the same value if that fixes the issue. |
I set the quantities to be the same, I also put a breakpoint into my object GetHashCode and equals methods, they are being called, and re-wrote the query with multicast, but unfortunately I still get a weird result, see my query and my object below `
public class ChildOrderModel : IEquatable
` Any other ideas? It seems such a simple use case, but results don't make a any sense. |
I'm curious what happens if you override Equals if you also hit a breakpoint there. If you'd like, you can also put together a standalone testcase file and I can investigate further from there. |
I do hit breakpoint on Equals as well, once. Yes, I would very appreciate if we can get to the bottom of this case use case, as it is an important one for me. Just to reiterate, I am trying to count children withing sliding window, the only caveat is my children objects are updating and I want Trill to understand that and use the latest inside the window. |
OK - if you can drop a .cs file here I'll pick it up and debug further. |
Here you go, thank in advance for your help. |
OK, I think I have some answers for you. Part of the issue is that, either because of the JIT or because C# code is magically fast, several of your DateTime.Now calls are returning the same value. You're creating 4 values (for now, let's call them A, B, C, and E, just because I'm mean) and they're being assigned time values like this: This pattern means that ClipEventDuration doesn't do what you'd expect. You're going to get A clipped by B, C, or E (take your pick), while B, C, and E are all unclipped. What you'll need to do in code is make sure that your input data is not simultaneous, because ClipEventDuration is not going to remove that simultaneity. A couple of other small points:
Here's the code I ended up with after a little playing around. I hope this helps!
|
Thank you so much for the analysis and the reply, it makes sense. I will study Stich operator in detail. |
I also tried running your code, but I get no elements in the result list. Am I missing any other code? |
You're getting the empty result because there is a bug I fixed on the Stitch operator that I had not pushed to master yet - I've pushed it to the branch called "Provider" for the moment, and will try to push to master next week. How you create your events really depends on the query that you are trying to execute - however, creating start edges shouldn't result in a memory blow-up. |
Got It, I will wait for the new version and try again. Thank's again. Earlier you wrote "What you'll need to do in code is make sure that your input data is not simultaneous, because ClipEventDuration is not going to remove that simultaneity." I was hoping that it would only be an issue in testing, but it looks like I am getting simultaneous timestamp in real life as well. I don't see a good way to make sure the uniqueness of the timestamps, I browsed System.Reactive, there is not much there to delay each individual element occurence. Any idea how I should tackle that? |
If the events that are simultaneous are also equivalent to one another (in that they should only be counted once like you had in other cases) then I would suggest doing a Distinct() over them to remove all but one of the simultaneous events. The multiplicity of equivalent data that is also simultaneous seems to be the issue for your query, so removing the duplication would make the simultaneity irrelevant. |
I am not sure if Distinct will work, as it won't pass the latest element, if it's equal to previous. Where in my case two child orders may be equal but with different quantities, therefore I need both of them in Trill. I don't want to add quantity to my equality criteria. |
Distinct in Trill does not eliminate duplicates over all data across all time, just within each timestamp. So if you have data value A valid from t1 until t2, and value A again valid from t3 until t4, if t3 > t2 then Distinct will have no effect. If t2 < t3, then from t2 until t3 one of the A's would be flagged as a duplicate and removed. |
Got it, I will try it and revert back. |
dear @cybertyche |
Dear @afriedma, you mentioned "I am getting around this issue at the moment by merging a punctuation stream with interval of x, which forces Trill to generate output." |
Hi @nsulikowski, see sample code below
Hope this helps. |
thank you v m
Get Outlook for iOS<https://aka.ms/o0ukef>
…________________________________
From: afriedma <notifications@github.com>
Sent: Tuesday, April 23, 2019 9:56 AM
To: Microsoft/Trill
Cc: Nestor Sulikowski; Mention
Subject: Re: [Microsoft/Trill] Realtime Join (#64)
Hi @nsulikowski<https://github.com/nsulikowski>, see sample code below
ISubject<ChildOrderModel> ChildSubject = new Subject<ChildOrderModel>()
ISubject<StreamEvent<ChildOrderModel>> punctuationStream = new Subject<StreamEvent<ChildOrderModel>>()
var childStreamable = ChildSubject
.Select(e => StreamEvent.CreateStart(e.StartTime.Ticks, e))
.Merge(punctuationStream)
.ToStreamable(DisorderPolicy.Adjust());
Observable.Interval(TimeSpan.FromSeconds(5)).Subscribe(t =>
{
var timeTicker = DateTime.Now.Ticks;
punctuationStream.OnNext(StreamEvent.CreatePunctuation<ChildOrderModel>(timeTicker + 1));
});
Hope this helps.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub<#64 (comment)>, or mute the thread<https://github.com/notifications/unsubscribe-auth/AAPYZ55U224EF3GZ735KBCLPR4IPXANCNFSM4HCCDWDA>.
|
@afriedma A new version of the NuGet has been built and pushed, and the Stitch fix is there and in the master branch. |
@cybertyche thank you for your help and prompt feedback, I will try it out. |
Hello,
I have the following query, my issue is that, I don't get any output on my first publication to
parentSubject and childSubject, but I do start to receive output on the subsequent publications, but form previous publications. For example, I publish A, but I don't get result of A, until I publish B.
My impression was that, streams would get flushed every 1 and 2 seconds and produce output on every flush.
I create events using StreamEvent.CreateStart
`
var parentStreamable = parentSubject
.ToStreamable(null, FlushPolicy.FlushOnPunctuation, PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(1).Ticks));
var clippedParentStreamable = parentStreamable
.ClipEventDuration(parentStreamable, x => x.Id, x => x.Id);
var childStreamable = childSubject
.ToStreamable(null, FlushPolicy.FlushOnPunctuation, PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(2).Ticks));
var clippedChildStreamable = childStreamable
.ClipEventDuration(childStreamable, x => x.Id, x => x.Id);
The text was updated successfully, but these errors were encountered: