Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: be able to filter streams by field #5827

Open
Eywek opened this issue Feb 7, 2019 · 22 comments
Open

feature: be able to filter streams by field #5827

Eywek opened this issue Feb 7, 2019 · 22 comments

Comments

@Eywek
Copy link

Eywek commented Feb 7, 2019

Overview

It could be great to add an option to xread/xreadgroup command to be able to filter streams' messages by field.
For example, if we push some messages like this:

XADD my-stream * my-field my-content
XADD my-stream * my-field-2 my-content
XADD my-stream * my-field my-content

I would like to be able to read only new messages with the field = my-field.

Use case

In my company we're using redis streams to push data from our APM (metrics, exceptions...) and we have 2 services reading streams.
We use the stream name as data type and the field as client identifier, for example, we could push a message like this:

XADD metrics * <user-id> <payload>

To read data we use xreadgroup in a service called digester which put data into our datastore. And we have a service called realtime used to emit data in realtime to our interface.

The problem is that the realtime service need to read messages from stream in realtime (we can't use the count parameter otherwise we will accumulate delay). BUT, the realtime receive all messages from the stream (with XREAD STREAMS metrics 0-0) and we don't need that, we only need to listen messages sent by a specific user (who is connected to the service via websocket).
So, if we could filter via field, it will be possible for us to read only messages from a specific user.

Before, we were using pub/sub and this system allowed us to filter by field... But, using streams now is a better solution for us to provider more fault tolerance if our datastore is down and better scaling for our digester service via streams consumer groups.

Comments

It could be great if this could be implemented or if we have an alternative to do this type of things.

@itamarhaber
Copy link
Member

Hello @Eywek

As alternatives to server-side filtering (or any transformation/projection/whatever), you can consider using pipelines of streams and/or dedicated streams. In a pipeline, you'd have the metrics stream processed by a new pre-realtime process, that will add only filtered messages to a new stream - perhaps called "realtime" - that will be, in turn, consumed by the current realtime process.

Or, if you can have your user write its messages to a dedicated stream in addition to or instead of the metrics stream, then you can have your realtime process only consume that stream.

Before, we were using pub/sub and this system allowed us to filter by field

I'm not following this sentence - was it with Redis' Pub/Sub and if so how?

@Eywek
Copy link
Author

Eywek commented Feb 8, 2019

Thanks for the reply, I think having a pre-realtime process don't resolve our problem, we only shift it to another process (and reading all messages is heavily cpu intensive).

I already thought to dedicated streams but it gets complicated to monitor and read streams with digesters service.

I was talking about Pub/Sub to give an example of filtering and why we can't use them now.

@itamarhaber
Copy link
Member

reading all messages is heavily cpu intensive

Something has to do it... what you're suggesting is basically having the Redis (mostly single-threaded IIRC) server be that something.

You can scale the consumption of a Stream with a consumer group, and split the intensive work between its members.

@vmarchaud
Copy link

vmarchaud commented Feb 13, 2019

Disclamer: I work with @Eywek on the same project

We can't split it by design : any user that is subscribing to a specific realtime app need to get all the message so if we use a consumer group, any message could be read by the wrong app where no user is listening.
EDIT: Didn't understand your message sorry

We are indeed suggestion that redis can do the work since it already have all the message, what we are looking for is some kind of filter on the xread to allow filtering by user-id when reading a message.
We of course have other day to do it (using the pub/sub api for example) but we think it would be relevant for redis to have this system in place.

@andrenth
Copy link

Maybe run a Lua function to act as a filter. That would be nice to have.

@Eywek
Copy link
Author

Eywek commented Mar 26, 2019

The xread command is not allowed in lua functions

127.0.0.1:6379> eval "return redis.call('xread', 'streams','count', '10', 'status','0-0')" 0
(error) ERR Error running script (call to f_4fb52f570df9c2269b5f849e428ec6f084fca871): @user_script:1: @user_script: 1: This Redis command is not allowed from scripts

@andrenth
Copy link

Yeah, just saying it could be a way to implement filtering by content, by allowing it to be used from lua functions.

@alquist42
Copy link

how about letting XADD to combine explicit aggregate/entity ID with auto-generated by redis (*)?

> XADD somestream 123-* foo bar
123-1518951480106-0

this way we still have all the (really wanted) auto-generated behaviour, but also ability to filter by ID.

all events for 123
> XRANGE mystream 123 123
all events for 123 for some period
> XRANGE mystream 123-1518951480106 123-1518951480107

or maybe even XREAD STREAMS mystream 123-0 - listen only for events for 123

can it work?

@bklooste
Copy link

bklooste commented Aug 24, 2020

While a proper implementation would just put streams based on filter into another stream it is extremely useful to be able to do loose text searching within a field for debug / prod trouble shooting eg within json bodies ( even if slow) . So if a value can be a subset that would be great !

@itamarhaber itamarhaber added this to Done in 6.2 Aug 25, 2020
@itamarhaber itamarhaber moved this from Done to To do in 6.2 Aug 25, 2020
@oranagra oranagra removed this from To do in 6.2 Oct 19, 2020
@Kama0165
Copy link

Kama0165 commented Nov 3, 2020

@itamarhaber and @oranagra There isn't any comment on if this feather was added or not and if it was added what is the syntax for implementing it. Is there a documentation on it? If so, would you please link it. I have spend many hours searching the web for it and I didn't have much luck with finding any documentation.

@itamarhaber
Copy link
Member

Hello @Kama0165

This feature wasn't added yet. We are considering it, or a semblance of it, for a future release (and in that case, you should expect it to be properly documented at redis.io, of course).

@guybe7
Copy link
Collaborator

guybe7 commented Dec 9, 2020

@Eywek @Kama0165 a few things to mention:

  1. while filtering XREAD should be fine, we can't do that in XREADGROUP (because every entry read by XREADGROUP absolutely has to go into the consumer PEL - where will the filtered entries go to?). so far the commands have the same syntax, i wouldn't want to change that.
  2. starting from version 6.2 both XREAD and XREADGROUP will be allowed in lua (without the BLOCK modifier) so you can do the filtering with a 3-line lua script (commit: 9a1843e)

@Eywek
Copy link
Author

Eywek commented Dec 9, 2020

I think the @alquist42 proposal is quite good and maybe simpler don't you?

@guybe7
Copy link
Collaborator

guybe7 commented Dec 9, 2020

the thing is, while we can have XREAD reading information regarding only a subset of the existing entries, for XREADGROUP it's impossible - and i don't think it's a good idea to have such a big difference between XREAD and XREADGROUP

also, i forgot to git fetch - XREAD and XREADGROUP are allowed in scripts since 6.0.0

@Eywek can you explain what is it in the lua solution that you don't like?

@Eywek
Copy link
Author

Eywek commented Dec 9, 2020

It's not that I don't like the lua solution, I only think that an implementation in redis itself could be more efficient, but maybe I'm wrong.

Anyway, I'm no longer in the same company and I don't have this use case anymore so maybe it's better that someone who still have this issue give his opinion about it.

@bklooste
Copy link

Its trivial to do in code the main used case for me is casual examination of messages without knowing the time eg trouble shooting . Trouble shooting is not a good time to create scripts ...

I see no ready why XREAD and XREADGROUP should be the same i never use XREADGROUP for trouble shooting its an app thing.

@jmoz
Copy link

jmoz commented Nov 7, 2022

This would be a fantastic feature to add. I'm currently using streams to store per time period all stats/metrics whatever they may be. Being able to filter the fields returned would help optimise the data transfer.

At the moment, say I'm storing 1000 fields, but my client or frontend etc may only need a few of those.

@guybe7
Copy link
Collaborator

guybe7 commented Nov 7, 2022

@Eywek you mentioned that your realtime service always reads all entries (XREAD STREAMS metrics 0-0) - would it help you if we added the filer arg to XRANGE instead of XREAD[GROUP]?

@Eywek
Copy link
Author

Eywek commented Nov 7, 2022

@Eywek you mentioned that your realtime service always reads all entries (XREAD STREAMS metrics 0-0) - would it help you if we added the filer arg to XRANGE instead of XREAD[GROUP]?

Lgtm

@jmoz
Copy link

jmoz commented Nov 7, 2022

@guybe7 filter on XREAD and both the XRANGE would also be really useful

@guybe7
Copy link
Collaborator

guybe7 commented Nov 7, 2022

@itamarhaber perhaps we should consider what was suggested in #7695

since implementing a filter is basically impossible for XREADGROUP (because we can't "skip" entries, CGs PEL has to be consecutive), and we don't want XREAD to diverge too much, maybe we should add filtering to X[REV]RANGE?

@guybe7
Copy link
Collaborator

guybe7 commented Nov 7, 2022

@jmoz as i explained in #5827 (comment) we don't want to create a big difference between XREAD and XREADGROUP, since they use the same function, which is complex enough as it is

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants