Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Fixes and improvements for jobs/streams/events/api #853

Merged
merged 17 commits into from Jun 18, 2014

Conversation

tarruda
Copy link
Member

@tarruda tarruda commented Jun 16, 2014

This PR does three things:

  • Add code for dealing with msgpack parse failures(those are very rare but can happen as I experienced while running some manual tests)
  • Enable transfering big amounts of data through WStream instances
  • Use a single method of "locking" on a set of event sources, which is useful for [RFC] system() prefer pipes over tmpfiles #807 or the python ex command which will come after this.

@aktau, this should simplify the implementation of a job_sync function, it should be something like this:

job = job_start(...); // start a job
job_write(job, data); // write all data to the job
job_set_defer(job, false); // disable event deferral, we want to process job events inside `event_poll`
while (!exited) { // exited can be a flag set when the job exits
  event_poll(-1);
}

I have not tested the above code but it should work, let me know if you have any questions.

@aktau
Copy link
Contributor

aktau commented Jun 16, 2014

The commit message of 4bf36bd says least unobtrusive, I think that's supposed to be either least obtrusive or most unobtrusive.

@aktau
Copy link
Contributor

aktau commented Jun 16, 2014

Enable transfering big amounts of data through WStream instances

I'm guessing that refers to this commit: wstream: Change wstream_write failure behavior. It's a bit surprising how such a small change has that effect. It also seems to allow writing large amounts of data only once (until the write call succeeds), because in the lines after that buffer->size is added to wstream->curmem.

Also, tangentially, shouldn't the return value of uv_write in wstream_write influence the return value? (error or not).

@aktau, this should simplify the implementation of a job_sync function, it should be something like this:

Possibly it does, it looks quite nice to be able to just call event_poll without consequences. Though I'm not sure about having to check an external flag. Isn't there a way to check on the job struct itself?

Now I guess one of the bigger differences is the fact that the os_run_sync prototype didn't need to copy the input buffer (it passed it as const char *). This made it the callers responsibility to handle memory. Very perhaps, this is possible by manually ref'ing the WBuffer passed into wstream_write, such that it won't free it when write_cb is called. But I'm not sure that's a use case for which the ref/unref code was intended.

@tarruda
Copy link
Member Author

tarruda commented Jun 17, 2014

The commit message of 4bf36bd says least unobtrusive, I think that's supposed to be either least obtrusive or most unobtrusive.

👍 thanks

Possibly it does, it looks quite nice to be able to just call event_poll without consequences. Though I'm not sure about having to check an external flag. Isn't there a way to check on the job struct itself?

That makes sense, I will add 'job_alive' function to check status.

Now I guess one of the bigger differences is the fact that the os_run_sync prototype didn't need to copy the input buffer (it passed it as const char *). This made it the callers responsibility to handle memory. Very perhaps, this is possible by manually ref'ing the WBuffer passed into wstream_write, such that it won't free it when write_cb is called. But I'm not sure that's a use case for which the ref/unref code was intended.

True, I have and idea for fixing that

@tarruda
Copy link
Member Author

tarruda commented Jun 17, 2014

Also, tangentially, shouldn't the return value of uv_write in wstream_write influence the return value? (error or not).

I just read the source code of uv_write2(src/unix/stream.c). Here are the possible causes for failures:

  • bad file descriptor
  • invalid handle
  • out of memory

At first glance, the only one we should worry about is 'out of memory' because libuv calls malloc directly, this may also be a problem when using msgpack and possibly luajit in the future. I wonder whats the cleanest way of dealing with this(should we provide x-wrappers for the library functions that potentially allocate memory? In any case this should be fixed in another PR.

BTW, what do you think of the optional callback that can be passed to wstream_new_buffer? You can return false in the callback to free the memory yourself. Plus the callback gives some extra flexibility when one needs to know when the data was successfully written(I will probably use that to implement vimscript autocommands for job_write/send_event)

There seems to be no way to deal with failures when calling
`msgpack_unpacker_next`, so this reimplements that function as
`msgpack_rpc_unpack`, which has an additional result for detecting failures.

On top of that, we make use of the new function to properly return msgpack-rpc
errors when something bad happens.
Before this change, any write that could cause a WStream instance to use more
than `maxmem` would fail, which is not acceptable when writing big chunks of
data. (This could happen when returning contents from a big buffer through the
API, for example).

Writes of any size are now allowed, but before we check if the currently used
memory doesn't break the limit. This should be enough to prevent us from
stacking data when talking to a locked process.
@tarruda tarruda changed the title [RFC] Fixes and improvements for jobs/streams/events [RFC] Fixes and improvements for jobs/streams/events/api Jun 17, 2014
@tarruda
Copy link
Member Author

tarruda commented Jun 17, 2014

picked some commits from the redraw events PR as those will be needed soon

@aktau
Copy link
Contributor

aktau commented Jun 17, 2014

At first glance, the only one we should worry about is 'out of memory' because libuv calls malloc directly, this may also be a problem when using msgpack and possibly luajit in the future. I wonder whats the cleanest way of dealing with this(should we provide x-wrappers for the library functions that potentially allocate memory? In any case this should be fixed in another PR.

Pertinent question indeed. Maybe we can do that before the first true release (not in alpha/beta) (the x-wrappers, because there might be a lot of them). I'm not entirely sure of the best way to do that either.

BTW, what do you think of the optional callback that can be passed to wstream_new_buffer? You can return false in the callback to free the memory yourself. Plus the callback gives some extra flexibility when one needs to know when the data was successfully written(I will probably use that to implement vimscript autocommands for job_write/send_event)

It gives a lot of flexibility but perhaps makes the usage of wstream_new_buffer cumbersome. Perhaps somehow we could simplify and/or generalize. Is the copy flag still necessary? The possiblity of copy and the callbacks that signal if data must be freed after the WBuffer is done writing seems to "complect" things (the term as coined by Rich Hickey). It makes me think more about how to use the function when I need it, and not in a good way.

I'm have some trouble thinking of a way to uncomplicate the WBuffer, but perhaps there can be an alternative interface to wstream for writing one off data if the user knows that he won't free the buffer before the wstream is done with it. For now I only came up with a rather poor attempt at making an extra interface to wstream_write (raw) (optional cb) that would have a simpler convention and implementation:

typedef struct {
  uv_write_t req;
  done_cb cb;
  void *data;
} WriteDataRaw;

// write n bytes to the stream, `buf` will not be freed.
// you can safely free the buffer if either done_cb is called
// or you have another way of knowing that the stream was done/closed/...
// (for example when the parent job says it's done).
bool wstream_write_raw(WStream *wstream, const char *buf, size_t size)
{
  return wstream_write_raw_cb(wstream, buf, size, NULL, NULL);
}

bool wstream_write_raw_cb(WStream *wstream, const char *buf, size_t size, done_cb cb, void *data)
{
  // This should not be called after a wstream was freed
  assert(!wstream->freed);

  // I'm not sure if `curmem` and `maxmem` are relevant
  // for raw writes, since the memory is not managed nor allocated by the
  // wstream. Which is why I left the memory management out.

  uv_buf_t uvbuf = uv_buf_init((char *) buf, size);

  WriteDataRaw *wd = xmalloc(sizeof(WriteDataRaw));
  wd->req.data = wstream;
  wd->cb = cb;
  wd->data = data;

  return uv_write(&wd->req, wstream->stream, &uvbuf, 1, write_raw_cb) == 0;
}

// write_cb_raw is a simpler version of write_cb that doesn't try to do free the buffer
static void write_raw_cb(uv_write_t *req, int status)
{
  WriteDataRaw *wd = (WriteDataRaw *) req;
  WStream *wstream = wd->req.data;

  wstream_dec_pending(wstream);
  if (wb->cb) {
    wd->cb(wd->cb_data);
  }

  free(req);
}

// @return true if the stream was freed, false otherwise
static bool wstream_dec_pending(WStream *wstream)
{
  wstream->pending_reqs--;

  if (wstream->freed && wstream->pending_reqs == 0) {
    // Last pending write, free the wstream;
    free(wstream);
    return true;
  }

  return false;
}

// @return true if the buffer was freed, false otherwise
static bool wbuffer_dec_pending(WBuffer *buffer)
{
  if (!--buffer->refcount) {
    bool should_free = true;

    // perhaps not needed anymore...
    if (buffer->cb) {
      should_free = buffer->cb(buffer->cb_data);
    }

    if (should_free) {
      // Free the data written to the stream
      free(buffer->data);
    }

    // poison the struct
    memset(buffer, 0, sizeof(WBuffer));

    free(buffer);
    return true;
  }

  return false;
}

I think I mostly failed.

By the way, from a bit of inspection of the libuv source I think there can never be a race condition where write_cb is called between several calls of wstream_write with the same WStream (which would cause an assert fail). (uv_write2 -> uv__write -> uv__write_req_finish -> add to queue for next event loop -> cb). You had probably already noticed that, but I wanted to make sure.

Also, since now wstream_write can use more than maxmem memory, perhaps the function comment should be changed for wstream_write.

@tarruda
Copy link
Member Author

tarruda commented Jun 18, 2014

@aktau the wbuffer/wstream interface should be simpler now while still giving a lot of control to callers

@aktau
Copy link
Contributor

aktau commented Jun 18, 2014

@aktau the wbuffer/wstream interface should be simpler now while still giving a lot of control to callers

Looks quite nice indeed!

- Removed 'copy' parameter from `wstream_new_buffer`. Callers simply pass a
  copy of the buffer if required.
- Added a callback parameter, which is used to notify callers when the data is
  successfully written. The callback is also used to free the buffer(if
  required) and is compatible with `free` from the standard library.
The name `async` was not appropriate to describe the behavior enabled by the
flag.
This function will be used to temporarily change the `defer` flag on rstream
instances.
'job_start' returns the id as an out paramter, and the 'job_find' function is
now used by eval.c to translate job ids into pointers.
This is has the same effect as the RStream 'defer' flag, but also works for the
job's exit event.
This was done to give more control over memory management to job_write callers.
These functions will never be called directly by the user so bugs are the only
reason for passing invalid channel ids. Instead of returning silently we abort
to improve bug detection.
To make it possible reuse `event_poll` recursively and in other blocking
function calls, this changes how deferred/immediate events are processed:

- There are two queues in event.c, one for immediate events and another for
  deferred events. The queue used when pushing/processing events is determined
  with boolean arguments passed to `event_push`/`event_process` respectively.
- Events pushed to the immediate queue are processed inside `event_poll` but
  after the `uv_run` call. This is required because libuv event loop does not
  support recursion, and processing events may result in other `event_poll`
  calls.
- Events pushed to the deferred queue are processed later by calling
  `event_process(true)`. This is required to "trick" vim into treating all
  asynchronous events as special keypresses, which is the least obtrusive
  way of introducing asynchronicity into the editor.
- RStream instances will now forward the `defer` flag to the `event_push` call.
They were renamed to find_{buffer,window,tabpage}_by_handle to avoid conflicts
with existing functions of the same name.
- Rename a/n/m to items/size/capactity in kvec.h
- Add capactity field to Arrays/Dictionaries
@tarruda tarruda merged commit a7d027c into neovim:master Jun 18, 2014
tarruda added a commit that referenced this pull request Jun 18, 2014
dwb pushed a commit to dwb/neovim that referenced this pull request Feb 21, 2017
Should be done similar to `neomake#utils#getbufvar` instead anyway.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants