-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Timeout event generation #33
Comments
Hi @pandaadb I have just released version 2.2.0, with a new feature : push_previous_map_as_event You can find a good example here : |
Hi @pandaadb, Several people have the same need than you. Firstly, are you still interested by implementing this enhancement ? |
Hi @fbaligand Sorry - i missed the update the last time. I am definitely still interested in that. I have since continued working on the plugin and am using it in production (so it does seem to work). My branch/fork is here: https://github.com/pandaadb/logstash-filter-aggregate I have added a few extra options (which I am not sure are all needed) including:
I'm happy to discuss what could be useful/merged if not all. I learned a bit more about ruby since starting on that so I am hoping I didn't produce too much of a mess :) |
Wow, that's a lot of options :) Concerning timeout_id, if I understand well, it is the field name that is added to "timeout" generated event, that's it ? Given your explanation, I'm not sure what you put inside ? Is it task_id ? Is it task creation time ? Anyway, could you rebase your branch, so that all your commits are "after" last master commit ? |
Hi, yeah, timeout_id is a bit useless I think and also confusing. It forces the task_id property to be in the time'ed out event, so that the timeout event can be matched back to the id that created it. For example: timeout_id => "hello" if now an event comes in, the filter looks at field "x" which for this example has the value "World". event[timeout_id] = map[task_id] which will end up looking like: event["Hello"] = "World" So now the timeout event can be matched back to the startevent, since we know that the field "hello" in the timeout event represents the task_id used for the start event. So in short: timeout_id is the field name used for the task_id that is used for aggregation. I am not sure what rebasing means? Do you mean create a new fork after your master and merge my branch in? |
OK. I think that an optional option called "timeout_task_id_field" is relevant. It would be set on timeout event only if set in configuration. Then I wonder : do you have an option to say that task timeout involves aggregation map is pushed as a new event ?
First of all, I invite you to tag your present branch. Then, you can do these commands to add upstream remote branch :
Then, either you rebase your code on your master, or if you prefer, on a special branch that you create from your master. I prevent you, you will certainly have conflicts to resolve :) That said, if you're not comfortable with Anyway, when you will merge/rebase, I think you will have some conflicts to resolve about timeout event creation. Because, there's some code done for that, associated with option "push_previous_map_as_event". |
Hi, I will attempt the rebase later today and get back to you. About your question: I don't have an option that says the aggregation map is pushed. Instead I have an option timeout_code which does the same as the regular code, with the exception that the code is only executed on timeout. The timeout_code gets the aggregation map and a new event so the user has full control on what he wants to do with the aggregation map in a timeout situation (e.g. in my case I add a few fields that indicate the timeout, and aggregate some other values from within the map, so just pushing the map as an event wasn't enough for me). I could imagine a situation where the timeout_code would default into simply mirroring the aggregation map, so in case people have no desire to do their own operations, it would simply push the map as a new event. |
Hi, I like your 'timeout_code' idea. But I see it a little bit differently :
|
Hi @pandaadb, What do you think about my previous comment ? Concerning the last 3 options you speak about, I think it could be the object of another issue/pull request. |
@fbaligand hi If I understand you correctly we'd want to split it in 2 phases then:
I agree with you. I think the first option is most useful. The second is quite close to my usecase and maybe not as useful to others. One other thing: What do you think about having an option that checks timeout on each event? By default events can only time out after 5 seconds. My usecase (though triggered by reparsing old data) needed a much more agressive timeout behaviour. Lastly, sorry I haven't gotten around to rebasing or coding anything yet :/ I hope I will get to it this week. Thanks! |
Hi, so I have started work on this. I did the following: I tagged my branch as it was. Then I created an actual branch and committed it (so i don't lose my changes) Then I rolled back to the point for event generation (without all the extra stuff of the time tracking based on file keys etc). So now I am doing the rebase and committing. Once I have merged succesfully I will make the changes so they match the properties that we discussed. Once I rebase I assume I get your changes as well? (Push map as event) Thanks, Artur |
|
Hi, One question from my side, I don't know what this code of yours does:
If you could elaborate on that (it almost looks like the flush-on-all-events). The track times based on external key works like this: Scenario: You are reparsing 1 months worth of data. The input plugin guarantees that (if single threaded) 1 file comes in in the read order, so: T1 XYZ Where T1 < T2 and so on. So this will work ok since T1 always < T2 and the timeout does not occur. However this has 2 problems:
So When E1 comes in, the event time is T1 (which might be 1 month in the past). so the variable: @@last_eviction_timestamp = Time.now becomes: @@last_eviction_timestamp = event['my-timestamp-field']
So with this in mind, if the element.creation_timestamp is the timestamp of the event, this logic will no longer work: (element.creation_timestamp < min_timestamp) Because the creation_timestamp could be 1 month ago, while min_timestamp is calculated based on Time.now. Also, it can't work to just track 1 timestamp, since the nature of files coming in (as well as the nature of multithreaded, say file A and C are read at the same time) will override their timestamp. @@timestamp['my-unique-file-path'] = event['timestamp'] That way, the times are tracked on:
With that approach (in addition of expire on every event), i can reparse all the logs and expire the events. Imagine Event E1 - EX come in within 1 second, but they are all events of 1 day (24 hours). I want to expire events that are 15 minutes apart. With the above approach the events can come in as fast as they want, because I am tracking the event's timestamp which will jump 24 hours in 1 second and correctly detect expiry based on the event's timestamp. Ok - I hope this makes sense :) If not, let me know and I will write up a more concrete example |
The code with "push_previous_map_as_event" is particularly done for jdbc input use case. It is a very specific use case, where tasks come one after the other. Firstly all task1 events, then all task2 events, etc... Is this more clear for you ? |
Hi! Oh that makes sense, thanks. Now I understand what the code does as well. My implementation will then:
So my config would set Because i would have multiple task_ids (so my map is always of size > 1) Sounds like a Plan :) Thanks for the clarifications! |
Hi, I am having git troubles I think. I am now at a point were I merged the changes I wanted to merge, so the version I have locally has:
I think this is all we wanted to merge for the first version. However, rebase wants to continue merging the other changes I did as well (tracking timestamp on event). I don't want to merge those in, since I will have to remove them afterwards. I believe this is because my commit tree on master kind of looks like: commit V1 So obviously I can skip step 2, 3 and 4 because they cancel each other out. But I don't know how to stop rebasing after V1? |
About your 4 implementation points, I fully agree with what you say. |
Concerning your git problem :
If it's not clear or need more help, tell me. |
Hi Fabian! thanks for your help :) I struggled a bit because I tried to commit a branch to the logstash-plugin (which obviously I have no access to). I managed to fix it now, and I think the first version stands: https://github.com/pandaadb/logstash-filter-aggregate/ What I did:
Would you like a pull request or would you rather look over it first? (Edit: the docs don't match anymore, i will update them once the code is ok) Thanks! Artur |
You can create a PR ! Fabien
|
Cool, I created #37 :) Thanks! Artur |
@pandaadb release 2.3.0 is done with timeout event generation ! |
Yey :) I hope all works as expected and people enjoy the new feature! Pleasure working with you! |
It was a pleasure for me too ! Nice news :) |
Breaking news : official logstash documentation has just been updated with your new options and your sample ! |
That's great :) I wonder if the documentation needs to be updated? I believe that there is a breaking change in Logstash 5+ where you can not handle the event as an array but you need to use set or put? e.g. So instead of saying code needs to be:
it should be
For example in Example 2. you have:
|
Yes, there is a Breaking change in 5.0 where logstash event becomes a Java bean, and not anymore a ruby object. Anyway, this is important that logstash official documentation has been updated because lot of people only look at this documentation and don't look at github site or plugin code. Fabien
|
For info, I just released version 2.3.1, with a new option : |
Was this ever implemented, or is there a way to do this? I am processing some historical logs and this feature would be extremely useful |
No. There is no such a feature. If you need such a feature, I invite you to open a specific issue. |
Ok, I'll open an issue. |
Hi guys!
I have seen a discussion in a different thread but it was closed. Today I came across a requirement for our usecase where we want to aggregate our data, but we do not have end events. So I googled a bit and found this and thought the timeout usecase would be ideal for me. I cloned your repo and implemented the following:
This field is the same as code, but it is code that will be executed for the timeout action. I am not sure if this is necessary or if I can simply reuse the code property.
Whenever flush is called, I now generate a new event per key in the aggregated_map. The key is added by default, as is the creation timestamp. They key is mapped to "timeout_id".
If no timeout_code is defined, nothing is happening (i am hoping this ensures backwards compatibility)
Instead of comparing the timestamp of the creation, i now update a "last_modified" property each time there is a new event that is aggregated. This way I can aggregate over a longer period of time without having to restart the aggregation because timeout from the start event was found.
I added test cases for the changes that I have made.
You can find my changeset here: pandaadb@ba57fac
I would be happy to create a pull request if you guys think that this is a useful feature.
I am coming from a java background and am not too experienced with ruby coding, so if there are serious issues with my code, please give me a chance to correct them :)
Let me know what you think,
thanks,
Artur
The text was updated successfully, but these errors were encountered: