New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ReQL proposal: restarting feeds #3471
Comments
Also note, the meaning of the opaque timestamp value is always monotonically increasing. |
My guess is that this is probably not too difficult in a basic form. There are a few open questions:
Point 3 will become especially relevant when we implement automatic failover. A minor detail: In our current backfilling logic, the "drop potentially obsolete key range" requests are restricted to a given hash shard. We have to either expose the hashing function to the user in some way, or make sure that we re-send the given key range over all hash shards. |
Another thing to note is that our backfill implementation relies on taking a snapshot of the whole table while it's running. We have to think about how that interacts with changefeeds that might be open for a long time because the client is slow in reading all the (initial) data of the cursor. Specifically I'm concerned about memory consumption on tables with high write loads. Can I change my "My guess is that this is probably not too difficult in a basic form." to a "This is a significant amount of work, but seems feasible."? |
WRT to snapshot memory issues, I'd also ignore that for now. We already run into that in case of backfills, so I wouldn't worry about it too much until later. If we really wanted to be careful here, we could just terminate the feed if the snapshot gets too big (i.e. the user is too far behind), and they could restart the feed. I think it'll be really important to do this quickly after 2.0 since it's a huge limitations to the current system, so I'd try to cut scope in every way possible. I suspect we can get away with doing very little with respect to many of the operational issues like these. |
I personally would always set @coffeemug you're probably thinking of specific use cases when you say that |
You'd want it to be true any time you're loading a realtime web page. Things like |
@coffeemug I think solving the snapshot memory issue is worth looking into. It might not actually be that difficult to do. |
Actually thinking more about it we can probably avoid implementing pageable snapshots in the cache. Instead we could use a similar technique to what is suggested in #1944. We would stream the initial results in batches of small primary key ranges. After every range, we would send the user a new opaque timestamp that reflects the fact that we have backfilled up to a given replication timestamp for that small range. If we implement this, we can either start streaming changes for a given key range to the user as soon as we have "backfilled" all initial results for that small range, or keep accumulating them in a disk backed queue until the initial results for the whole table have been sent. |
Ah I see. I think this is not to bad then:
It sounds confusing because the user has to know which queries return a stream and which return a datum. In practice, at least with the types of changefeeds that we currently support, I think it's sufficiently clear though. |
You guys had mentioned something like {id: <feed_uuid>
feed: <FEED_PSEUDOTYPE>
} So the drivers could maybe do something like: for item in r.db('system').table('changefeeds').get(<feed_uuid>)('feed'):
print item['new_val'], item['old_val'] |
Here's an alternate proposal that would take a lot less work to implement: 1> r.table('test').changes() => non-restartable changefeed
1> r.table('test').changes(persist: 'my_changefeed') => restartable changefeed
1> CRASHES
2> r.table('test').changefeed('my_changefeed') => steal the restartable changefeed from (1)
2> r.table('test').changefeed('my_changefeed').delete => safely close the changefeed so it doesn't hang around taking up memory We'd basically give people a way to create a named changefeed that exists above the connection level, so another client can steal that changefeed and keep reading changes from it in case the first client goes down. This wouldn't let people requests all the changes starting at an arbitrary point in time, but it's way way easier to implement. (We could also keep the last batch sent around in memory, and have an optarg to indicate whether or not (2) receives the last sent batch a second time, which I think is usually what you'd want in the case where (1) dies.) |
What happens to changefeeds that people forget to delete? |
They use up memory indefinitely, but not an arbitrary amount of memory (at most 100k changes right now). We could also add a reasonable timeout after which they're evicted (although it's debatable whether reasonable would mean "an hour" or "a week"). |
We could also make the timeout configurable as another optarg to |
I like the mlucy api better, but I like coffeemug's guarantees better. If my process crashes that was listening on the changefeed with the @coffeemug proposal, I can always recover what happened in between. If we only store the last sent batch in memory, we solve the "can't recover a feed" problem, but we don't solve the "I'm sure I didn't miss a change due to a crash" problem. |
If we're willing to make some compromises, we could design the API such that the user can't tell which method is being used under the hood. It would look something like this: >>> r.table("foo").changes(resumable=True)
[{ 'old_val': ..., 'new_val': ..., 'token': OPAQUE}]
# then after the crash
>>> r.table("foo").changes(resume_token=OPAQUE)
[{ 'old_val': ..., 'new_val': ..., 'token': OPAQUE}] The initial implementation would look like this: When the user calls In a later implementation, we could make (I'm not sure if this is actually the right way to go. It seems like the naive implementation has a lot of hidden "gotchas" the user has to know about.) |
That seems reasonable to me. If we always kept at least one old batch around until we evict the feed entirely, then it would be impossible for someone to process a row with a given token, crash, and not be able to restart at that token (i.e. they'd have to not be logging every token they process to whatever service they use to recover). That seems like an OK guarantee for a first pass, and we can make the guarantee stronger later. |
I like @timmaxw 's proposal. That way we could ship a first version using a ring buffer on the primary replica that:
We could think about maintaining resumable changefeeds over table reconfiguration, though on first thought that seems like work we can better spend on the better implementation. Later we can follow up with an implementation that uses store timestamps, in order to
I'm uncertain as to whether it's worth spending time on the first implementation rather than going for the second one right away. This will depend on how much work we expect the respective implementations to be. |
It's worth noting that this impacts #3564; using tokens in this way won't work with point changefeeds the way things are now. |
For point changefeeds it's cheap to send users the initial value every time, so there's no need to make changefeeds resumable or persistent. The only exception I can think of is changefeeds on map-reduce. But then there's no backfill-like solution available, so we'd have to make the map-reduce trees persistent. At that point I'd want to make them explicit objects that the user can create and delete, like secondary indexes. If the user runs The |
This is true for some but not all use cases. We support non-squashing point changefeeds, so it's plausible someone would want to use a changefeed to e.g. make sure a user's balance never dips below $0 and charge them an overdraft fee if it does. In that case "resuming" the changefeed by just re-sending the initial value doesn't get you what you want. |
That also excludes backfill-based resuming. I think we shouldn't support non-squashing resumption. In fact, I'm suspicious of non-squashing changefeeds in general. If someone wants to do something special if the user's balance ever drops below zero, they should put that logic in the write queries. |
Maybe in that one example that would be reasonable because you'd only want one or two places where you update someone's balance. In general, though, I think we should give people a way to write code that triggers whenever a certain change happens without forcing them to attach a copy of that code to all the write queries that could conceivably cause such a change. |
I agree with @timmaxw that we shouldn't worry about non-squashing resumable changefeeds. As far as I can see resumable and non-resumable changefeeds really have quite different types of use cases. |
This portion of @timmaxw's proposal makes me nervous:
The example @danielmewes just pointed out - a web app that broadcasts events - would like to pick up where it left off after a crash, but would be incapable of eating an entire table's worth of data, deduping it with what the client already has, sending 4 million websocket events, etc.
If |
In my comment above, I suppose I am implicitly assuming the second use case from @danielmewes' comment about replication to ElasticSearch vs a web app, for both my questions. Perhaps what I'm getting at is that in this implementation, it would be nice to have a flag that says "please do NOT dump me the entire table if I'm out of buffer." |
@kofalt: That's a really good point. In fact, we should probably never dump the entire table when the user tries to resume a changefeed. If the server expired the changefeed, it should always throw an error instead of dumping the table. The client can catch this error and start a new changefeed with |
If RethinkDB integrated reliable changefeed consumption with resume feature, and group consumption with load-balancing feature, we would immediately replace Kafka for it. But it does only half of the job. We thought using it for our web front-ends, but the "changefeed" feature would overlap with the "kafka" event notifications (notification of changes) and we do not want to add up on the techno stack. Making this changefeed feature be more cluster friendly, is, IMO, a high priority. I am interested for any RethinkDB alternative that provides delivery guarantees and Kafka-like consumption feature of changes. It sounds to me like an unicorn so far... |
Well I'm not sure if the aim should be to replace kafka...db operations are
|
Guys, you should take reliable changefeeds feature (which survives client or server restarts) more seriously.... this is "must have" in real time world, right now we have to use rethinkdb as a secondary db or not use it at all, for example by implementing event sourcing we can ignore many rethinkdb features. |
@RXminuS I don't whant troughput that gives Kafka or RabbitMQ, I need gurantee that notification sended from RethinkDB is guranteed recived by Client. Thats all. I don't whant to use RethinkDB for streaming processing. RethinkDB must be positioned as one source of truth DB and must replace stack of Message Broker used for notifications of changes in DB. Imagine: you create microservice that use changefeeds for listening newly created/deleted users. This service must send email on user creation/deleteion to the admin. And voila, changefeeds send message to the microservice and it don't receive message because of problem with internal network. So what microservice should do? Use timer to retrieve all data from DB? What is use cases of changefeeds after all? |
Wanted to pile-up on the requests to give this issue more priority. As it stands, changefeeds can only be used for notifications for which data-loss it not that big of an issue. Since the real-time capabilities and with that changefeeds are at the core of Rethink's value proposition, getting this issue fixed should be among the highest in prio in my opinion. My specific use case: a polyglot architecture where Rethink is the single source of truth, and all other databases (redis, elasticsearch, ...) are read caches (or Eager Read Derivations if you will) on top of this data. This is a great use case with tremendous wide spread potential, but as it stands, it's unusable because the system as a whole cannot guarantee delivery. (i.e.: at-least-once-semantics) So again, please redirect efforts a bit more to this issue. Thanks |
Hi I just want to know what is the current status on this, is there anybody working on this? thanks in advance. |
@riyadhzen I think there is no work being done on this issue, the work-around still is to have all the registers you are interested in track with a timestamp and persist this timestamp after each batch is updated. This will enable you to restart the change-feeds at any given point in time and will reduce the problem to something manageable. |
@thomasmodeneis, I understood from your words that I should keep a timestamp in the documents this way if my feed gets disconnected all I have to do is reconnect from the last timestamp onwards. Thank you. |
Having just arrived here from the node-rethinkdb-job-queue thread, I just wanted to inquire about the state of this. As I understand it, changefeeds are unreliable. Is this correct? (Or is this only an issue in clustered deployments? Given the nature of our project – personal web sites – there should never really be a reason to cluster a deployment as they will be deployments for one. I’m pretty sure that even if Stephen Fry decided to get one he wouldn’t need a cluster.) :) I was considering making RethinkDB the “single source of truth” for an ActivityPub server implementation I’ve just started working on but, after reading the posts here (e.g., @gebrits’s), I’m reconsidering that. Can anyone using changefeeds in production please tell me if I’m in for a world of hurt if I decide to depend on them? Would love to hear from folks with practical experience in this area. (Also, if changefeeds – a core, distinguishing, and loudly advertised feature – is unreliable, this should be mentioned in the documentation, right at the top. This is not something you want to find out when you’re several months into dev.) |
Hi, I don't know what you mean by node-rethinkdb-job-queue thread. Do you have a link? Changefeeds are unreliable in the sense that they could fail, explicitly, and then if you make a new changefeed, you'll miss out on writes. They'll explicitly fail if one of the servers the feed is receiving changes from goes down or gets network partitioned. Or if the server your ran the query on goes down. Or if the client goes down. You can hack your way around the problem of resuming a changefeed by putting a timestamp, either logical or wall-clock, on each row, then making an index by that timestamp (for efficiency), and then, when you want to restart a changefeed, doing so by starting a changefeed, and after that starting a range query on the secondary index by timestamp, piecing together the information that you've lost. There are of course issues to work out with timestamp monotonicity, which you can deal with but add an extra level of discomfort. (I don't have practical experience in this area as a user -- I'm a developer of the DB -- so I don't have personal experience with the difficulty here.) It would be possible to remove the issues with timestamp monotonicity if a server feature were developed that let your queries generate a (shard id, logical timestamp) pair that is guaranteed to increase (for each shard). |
Thanks for the quick response, Sam. I meant the thread referenced above from the node-rethinkdb-job-queue project (grantcarthew/node-rethinkdb-job-queue#77). So if I have a component, say, listening for a changefeeds to distribute messages to followers, if the feed fails, I will get an immediate notification and I can restart the feed (and if I’m using timestamps, I won’t lose my place?) That sounds acceptable for my use case, if so. Reading some of the earlier issues, I was under the impression that the fails were not immediately reported but that there was some lag. Thanks again. |
Hi aral - We've been using rethinkdb for over a year in production and initially leaned heavily on changefeeds. With a low amount of database traffic things seemed to be working quite fine. Once traffic was increased and we added more changefeeds into the mix all queries (reads / writes / cfs) were slowed down into an unusable state. Within the last month we just removed our last changefeed from our server stack. We are still using rethink as our primary nosql db but all realtime communication is now done through redis pubsubs. Changefeeds hold a lot of great promise but I would not currently recommend using them in a production environment. In our setup we were using three rethink nodes each running on a i3.4xl ec2 instance. At about 15-20k concurrent changefeeds is when issues would crop up. |
There should be some lag because it does take some time to notice that a server's not responding and decide that it's timed out. It might also be that there's some honest-to-god bug where a changefeed just never gets responses instead of a notification happening. The documentation does a poor job of describing what error conditions are possible and what sort of errors can be returned. So I've got some uncertainty there. @zappjones is right -- if you have a large number of changefeeds listening on key ranges (and not individual documents), you'll pay a huge CPU-time price to see if each write should be sent to each of the change feeds. (There could be a performance problem when you have a ton of change feeds on individual documents, too, but it wouldn't be for the same reason.) |
Thanks, Sam. That’s really useful information. I’m going to spike out RethinkDB because I’d love to implement its beautiful workflow if at all possible. It would be awesome to see changefeed issues better documented. Would you like me to open a separate issue for that? |
@zappjones Thank you so much, that’s invaluable information. |
@srh Can you elaborate a bit more on this statement:
As I understand you mean that the website example:
will have "bad" performance already on ~15.000 active changefeeds? That would also be bad for us, we are currently evaluating rethinkdb but will have approximately 50.000 - 100.000 parallel active changefeeds with range queries (pagination). A lot of the live queries will be the same, so I am not sure if that will help with the performance (might be the case if rethinkdb does internal deduplication of changefeed subscriptions) |
I wasn't thinking of the limit example, just the range example (with small ranges, so that the overhead is "bad"). The limit example would also be hit by bad performance. Basically every write will interact with every range change feed, which costs 15000x work with 15000 change feeds. It is possible to improve the db implementation, so that both this and limit change feeds were reasonably fast, but I don't know anybody that has the free time for that right now. |
I'll have to check and see how identical subscriptions are treated. Various order_by/skip/limit change feeds ought to be handled together, and maybe those, and identical ranges, are handled more efficiently. I don't know that they are, so I can look sometime when I get the chance. |
Note that change feed notifications don't have to happen synchronously with
writes. If a write can mark an "area" with some sort of version number,
then there could be worker threads that walk all change feeds and determine
if they should be notified based on their last version number of that
"area".
With "area" I am very generally referring to a section of data that is
stored
Or, the threads each are responsible for a set of change feeds, and every time a write happens they check all feeds in a round-robin fashion, until they arrive again at the feed where they received the last write notification.
|
In fact, writes already do mark areas with a version number -- this is used for incremental replication. The current change feed API does provide old_val and new_val though. I'm not sure of the guarantees but I think there is one that values don't get "skipped" by a change feed. |
oh right, so then that's really only achievable with append-only writes…
…On Mon, Mar 26, 2018, 3:50 AM Sam Hughes, ***@***.***> wrote:
In fact, writes already do mark areas with a version number -- this is
used for incremental replication. The current change feed API does provide
old_val and new_val though. I'm not sure of the guarantees but I think
there is one that values don't get "skipped" by a change feed.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#3471 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AADWlp1sXWHMqKqdOQEx5MUW2nF16-tEks5tiElpgaJpZM4DLvt8>
.
|
Would be an awesome feature. |
Without change feeds, this database is dead to me since it lacks most of what you need in a database out of the box. |
The question of restarting feeds came up in #2953, #1118, and #2613. Since we've learned a lot since these issues were opened, I decided to start with a clean slate and a specific proposal.
When the user calls
changes()
on a stream, the feed protocol would inject an opaque timestamp into results as follows:The user could then pass the opaque timestamp back to
changes()
via thereturn_initial
optarg to get all the changes since the timestamp. We'd be piggy-backing off existing replication logic, so we'd have to tell the user the range of "obsolete" keys they should delete, and then fill them in on the new values in that range:A special value for
return_initial
isTrue
, which backfills the user from scratch:This would work on streams in general, like
t.map().changes()
andt.filter().changes()
.Note, if the changefeed is set up on a datum (e.g. a single document), the only legal values for
return_initial
areTrue
orFalse
.There is a question of what the default for
return_initial
should be. I can see a couple of options:False
. This is annoying for datum feeds (e.g. a feed on a document).True
. This may be annoying for stream feeds (e.g. a feed on a table).False
on streams, and toTrue
on datums. This might be confusing to users.Also note, under this proposal the
squash
optarg would continue operating as it does now, but would have no effect on the initial values./cc @danielmewes @timmaxw @mlucy. How hard would this be to implement on top of the current replication logic? Are there flaws in the API? Would it work on more general streams (like
t.filter().changes()
)?The text was updated successfully, but these errors were encountered: