Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What's the best practice for monitoring a filtered stream? #11

Closed
njcaruso opened this issue Sep 15, 2015 · 16 comments
Closed

What's the best practice for monitoring a filtered stream? #11

njcaruso opened this issue Sep 15, 2015 · 16 comments

Comments

@njcaruso
Copy link

I need to stream changes to a specified user, not to all users.

In my below code I can successfully initially query messages for a specific user, but any updates to the stream is not filtered. So it gives me updates to any change to the Message model regardless of the filter.

  1. Is there a way to filter a subset of a stream? Most specifically the authenticated user?
  2. I have a belongsTo relation between my Message model and my user model (which has User as the base model). Can I monitor a stream specific to user.Message, i.e. to just the relation?

Here's my current code:

   var url = '/api/messages/change-stream' +
      '?_format=event-source' +
      '&access_token=' + AuthService.getAccessToken();

    var src = new EventSource(url);
    var changes = createChangeStream(src);
    var set;

    Message.find({
      filter: {
        where: {
          userId: '55f73db4b3a03aa495c0aef2'
        },
        order: 'created DESC',
        limit: 5
      }
    }).$promise.then(function(results) {
      set = new LiveSet(results, changes);
      vm.messages = set.toLiveArray();
    });

I can see here that you can set a where clause in creating a change-stream: https://apidocs.strongloop.com/loopback/#persistedmodel-createchangestream, but I'm struggling on how to do this with the LiveSet approach. I suspect somehow from the Angular client I can create a change stream, pass in an option 'where clause'.

I'm also struggling to understand how the lbServices createChangeStream factors into all of this, like Message.createChangeStream(), versus the newly released factory createChangeStream() that's in angular-live-set.

@njcaruso
Copy link
Author

After some research, and looking at the source code in persisted-model, while it looks like it should support an options parameter, it doesn't do anything with it. I wrote a GIST that creates my own stream from a remote hook, and only streams data back for the currently logged in user. I hope this helps someone in the future:

https://gist.github.com/njcaruso/ffa81dfbe491fcb8f176

@steve8708
Copy link

major +1 to including this as a built in feature and to you @njcaruso for your great example. really helpful

@allupaku
Copy link

@njcaruso - the solution was awesome. I also copied from your code and used loopback-filters to apply all the filters in the change stream object. Created a gist here.

https://gist.github.com/allupaku/4139d4a366983568206a

May be this will be useful for some one..

@zihehuang
Copy link

@allupaku - I tested your gist and it worked very well for me. This issue is definitely worthy of a PR to loopback's main repository. The documentation on their PersistentModel's createChangeStream is clearly misleading since Strongloop never implemented the where filter.

@njcaruso
Copy link
Author

@allupaku, @zihehuang glad I could help you. I ran into an issue using EventSource (nothing to do with Loopback I think), where if the client disconnects manually or on browser close, the server never receives a "close" or "end" event hook. So a connection persists forever. Additionally, it seems that with no activity on the stream within 60 seconds, that the client requests a NEW request. So if you have 5 minutes of no activity, you'll get 5 total connections on the server.

I ended up having to write an interval after 60 seconds to destroy the stream, with a changes.on('data') event hook to reset the interval if data was sent.

So test your close/end assumptions. If you are able to get it to work, please let me know?

@allupaku
Copy link

@njcaruso - i have updated the gist with another file which is a middle ware i use to remove the default timeout of node/express web server.
It was getting timed out every two minutes approximately and was throwing error .. this middle ware fixed it..

I am just looking at a test whether i am able to reproduce that issue of multiple connections..

@allupaku
Copy link

@njcaruso - with 60 seconds of no activity i am not getting that behavior of client issuing another connection.. i just double checked .. Can this be a browser specific behavior ? which browser are you trying with ?

@allupaku
Copy link

@zihehuang - i have added another lb services gist as well which may help in creating the change-stream , could you please test that out if possible ? and let me know the results.. for me it is working perfectly..

@zihehuang
Copy link

@allupaku @njcaruso - I am not using the lb-services so I cannot confirm the use case for that. However, I can confirm that setting the res.setTimeout header to a big number does solve the timeout issue. The code below is my remote method:

  user.testSSE = function(id,res){
    res.setTimeout(24*3600*1000)
    res.setHeader("Content-Type","text/event-stream")
    res.setHeader("Cache-Control","no-cache")
    res.setHeader("Connection","keep-alive")
    res.write("id: "+id+"\n")
    app.models.userEventStream.createChangeStream({"where":{"userId": id}},function(err,changes){
      changes.on('data', (chunk) => {
        res.write("data: "+JSON.stringify(chunk.data)+"\n\n")
      });
    })
  user.remoteMethod(
    'testSSE',
    {
      http: {path: '/:id/testSSE', verb: 'get'},
      accepts:[{arg: 'id', type:'number', required: true},
      {arg: 'res', type: 'object', 'http':{source:'res'}}]
    }
  );
  }

Before the addition of res.Timeout as suggested by @allupaku , the client reconnects the stream every 2 min.

@dancingshell
Copy link

+1 i can duplicate what @zihehuang is seeing, setting the timeout in server.js does not prevent the change streams from timing out after 2 minutes

app.middleware('routes:before', function (req, res, next) {
  if (req.path.indexOf('change-stream') !== -1) {
    res.setTimeout(24 * 3600 * 1000);
    res.set('X-Accel-Buffering', 'no');
  }
  return next();
});

@AndruxaSazonov
Copy link

I'm using this code and it works - but only on main model, not on nested in filter:

function reactivate(eventUrl) {
                    
            return function($q, $resource, LiveSet, createChangeStream)
            {
                var sources = new EventSource(eventUrl);
                var changes = createChangeStream(sources);

                sources.addEventListener('data', function(data) { console.log(data); });

                var resourceObject = $resource(eventUrl.replace('/change-stream?_format=event-source&', '?'));

                return $q(function(resolve, reject) {
                    resourceObject.get(function success(results) {
                        var set = new LiveSet(results.data, changes).toLiveArray();
                        resolve(set);
                    }, function failure(results) {
                        reject(results);
                    });
                });
            };
        };

 // url something like: http://localhost:3030/data/cards/change-stream?_format=event-source&filter={"include":{"media":"image"}}

So the change events are coming when "cards" changed, not when "image" or "media" objects (they are both related as hasOne).

Don't know what to do for now.

@didikeke
Copy link

didikeke commented Sep 17, 2017

@dancingshell 's middleware works well when connecting loopback directly.

However it won't work in nginx proxy and get 'net::ERR_INCOMPLETE_CHUNKED_ENCODING' every 60 seconds.

Finally, Add
proxy_read_timeout 24h;
in Location configuration of nginx and everything goes well. (nginx version: nginx/1.10.3)

In case someone have the same problem, posted this comment here.

@nemes1s
Copy link

nemes1s commented Nov 7, 2017

I'm having troubles getting the result from one browser window to another using technique described in the gist.
From one browser instance to another i may need the specific model instance to be updated 3-4 times in order to get data to EventStream.

Maybe someone experienced something like that?

Thanks!

@nemes1s
Copy link

nemes1s commented Nov 9, 2017

Ok, it was my mistake misunderstanding the work of cluster. Having update fire up on one worker and user had stream on other one.

@stale
Copy link

stale bot commented Feb 11, 2020

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Feb 11, 2020
@stale
Copy link

stale bot commented Feb 27, 2020

This issue has been closed due to continued inactivity. Thank you for your understanding. If you believe this to be in error, please contact one of the code owners, listed in the CODEOWNERS file at the top-level of this repository.

@stale stale bot closed this as completed Feb 27, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants