Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Base implementation of a libuv-based event loop v1 plugin for libcouchbase
C
Tree: 06c4402f7d

Fetching latest commit…

Cannot retrieve the latest commit at this time

Failed to load latest commit information.
test
util
.gitignore
Makefile
README.pod
common.c
lcb_luv_internal.h
libcouchbase-libuv.h
main.c
plugin-libuv.c
read.c
run_lcb_tests.sh
socket.c
timer.c
write.c

README.pod

LCB-LUV

libuv implementation for a libuv-based event loop

DESCRIPTION

LCB-LUV is a libuv implementation of the struct libcouchbase_io_opt_st. It fully implements features needed by libcouchbase for an event loop.

Currently there is some logging output, and I have yet to check for memory leaks, extensive I/O errors, and whatever. But it passes the libcouchbase test suite (modified to allow for pluggable event loops) so it's at least usable to some extent

ARCHITECTURE

While libcouchbase (and most event loops) utilize a traditional Unix-style I/O-readiness model, libuv utilizes an I/O-completion model:

IO Event Models

Readiness

In a traditional unix-style event loop (assuming TCP sockets and all), non-blocking I/O is based on a readiness model.

The operating system provides 'handles' which are opaque numeric identifiers which correspond to TCP kernel buffers.

The TCP protocol implementation maintains internal buffers. When you send data, you are writing data to those buffers, and when you are receiving data, you are reading from those buffers.

In an I/O readiness model, you query the kernel (via select, poll, or whatever) to tell you if a particular buffer (read or write) is available.

For read-readiness notification, it means that there is sufficient data inside the TCP read buffer that reading from it will not block.

For write-readiness notification, it means that there is sufficient free space in the TCP write buffer, so that writing more data into said buffer will not block.

Thus the I/O readiness model inherently assumes that the user code will perform the calls which read/write data themselves, the notification mechanism simply reporting on the readiness of those buffers (that there is data to read, or sufficient free space to write).

It can be said, then, that the Unix model of IO events reports on states of the socket buffers.

Program Structure

In an I/O readiness model, the basic program structure is as follows. Let's use a simple HTTP request as a model.

I have grossly omitted error checking.

    1) Create a socket
        int socket = socket(....);

    2) Connect it, and register it with the event loop (we use poll(2)):
            connect(socket...);
            struct pollfd pfd = {
                .fd = socket,
                .events = POLLOUT,
            };

            /* wait for the initial connection to complete */

            poll(&pfd, 1, -1);

    3) Write into a buffer:
            char *http_request = "GET / HTTP/1.0\r\n\r\n";
            size_t len = strlen(http_request);

    4) Wait for the socket to become writeable (select, poll, or other abstraction)
            pfd.events = POLLOUT;
            poll(&pfd, 1, -1);

    5) Write data into the socket's write buffer:
            size_t nwritten = send(socket, http_request, len, 0);
            len -= nwritten;
            http_request += nwritten;

    6) Repeat steps 4 and 5 until 'len' is 0 (all data has been written)


    7) Allocate a buffer large enough to hold the response:
        We pretend the response will never be larger than the length of
        the buffer

            size_t len = 8192;
            int response_complete = 0;
            stream_parser = http_parser_new();
            char http_response[len]; /* C99 VLA */
            char *bufp = http_response;

    8) Wait for the socket to become readable
            pfd.events = POLLIN;
            poll(&pfd, 1, -1);

    9) Read data from the socket's read buffer
            size_t nread = recv(socket, bufp, len, 0);

            /* Tell the stream parser we have new data */
            response_complete = stream_parser_feed(stream_parser, bufp, nread);
            bufp += nread;
            len -= nread;

    10) Repeat steps 8 and 9 until:
            either len is 0 (response too large)
            or response_complete is true (response has been received)
Completion

For IOCP and libuv-style event mechanisms, the basic atomic unit is not a simple opaque file descriptor, but rather a complex structure consisting of a set of read and write buffers.

While these buffers are userland buffers (and don't live in the kernel's TCP stack [well, maybe they do for Windows]), it is the status of those buffers which form the basis of the events.

Thus, instead of a read-readiness notification, there is a read-buffer-has-data notification; and instead of a write-readiness notification, there is a data-has-been-written notification.

In contrast to the Unix model where notification is based on socket states, the IOCP model is based on socket operations. The notification is a result of the operation. Therefore, instead of waiting to be able to perform an operation, we wait for an operation to complete.

Program Structure

Using our HTTP client model from the Unix example, let's provide an outline of how this would work using uv's IO completion model

Again, I have left out error checking, and some other annoyances. It is also assumed there is some magical function which suspends the event loop whenever

It might be mentioned that while the IOCP model seems simpler conceptually, in practice the implementation is significantly more complex (albeit more extensible)

    0) Declare callbacks and globals:
        uv_loop_t *Loop;
        uv_tcp_t *Stream;
        uv_connect_t MyConnect;
        uv_write_t MyWrite;
        StreamParser_t StreamParser;


        char ReadData[8092];
        char *WriteData = "GET / HTTP/1.0 \r\n\r\n";
        uv_buf_t ReadBuf, WriteBuf;
        size_t LastReadSize = 0, ReadTotal = 0;

        /* a miniature state machine */
        typedef void (*next_action)(void);

        void when_connected(void);
        void when_written(void);
        void when_read(void);
        void when_done(void);

        /* callbacks required by libuv */
        /* these invoke the next state */
        void connect_callback(uv_connect_t *connect, int status) {
           ((next_action)(connect->data))();
        }

        void write_done_callback(uv_write_t *req, int status) {
            ((next_action)(req->data))();
        }

        void read_data_callback(uv_stream_t *stream, ssize_t nread, int status) {
            LastReadSize = nread;
            ReadTotal += nread;
            ((next_action)(stream->data))();
        }

        uv_buf_t alloc_callback(uv_handle_t handle, size_t suggested_size) {
            return ReadBuf;
        }

    1) Initialize our data:
        int run_this(void) {
            StreamParser = stream_parser_new();
            Loop = uv_default_loop();
            uv_tcp_init(Loop, &Stream);
            MyConnect.data = when_connected;
            uv_tcp_connect(&MyConnect, &Stream, ...);
        }

    2) Define what happens when we are connected:

        void when_connected(void) {
            WriteBuf.base = WriteData;
            WriteBuf.len = sizeof(WriteData)-1; /* don't need NULL */
            MyWrite.data = when_written;
            uv_write(&MyWrite, &Stream, &WriteBuf, 1,
                    write_done_callback);
        }

    3) Define what happens when data is written
        void when_written(void) {
            /* Let's read a response */
            ReadBuf.base = ReadData;
            ReadBuf.len = sizeof(ReadData);
            Stream.data = when_read;

            uv_read_start(&Stream, alloc_callback, read_data_callback);
        }

    4) Define what happens when we've read data

        void when_read(void) {
            /* Check if our response has trickled in */
            if (stream_parser_feed(StreamParser, ReadData - LastReadSize, LastReadSize)) {
                uv_read_stop(&Stream);
                Stream.data = when_done;
            }
        }

    5) Define what happens when we've received a complete response:
        void when_done(void) {
            printf("Response Complete!\n");
            fwrite(ReadData, 1, ReadSize, stdout);
            uv_unref(Loop); /* stop the loop */
        }

    6) Tie it all together:
        run_this();
        uv_run(Loop);

Adapting Completion to Readiness

Considering the order and structure of the operations for the different event models is significantly different, it was needed to use a few tricks in order to adapt IOCP to IO readiness.

Reads

For reading, a read-ahead is always active. This means that whenever libcouchbase asks to poll a socket for 'read buffer readiness', what actually happens is that LCB-LUV will pre-empt a read into an internal read-ahead buffer.

The read-ahead stops when the buffer is full.

The next time libcouchbase calls a recv/read function, the implementation copies from the readahead buffer into libcouchbase' supplied buffer to recv.

Read-ahead remains suspended until we have a short read (libcouchbase requests more data than we have in the buffer). In this case, we return with the partial data (or set errno to EWOULDBLOCK if the read buffer is completely empty).

While copying memory may not be idea, it is possibly more efficient as there are less system calls initiated by libcouchbase directly.

Errors are delivered asynchronously. If the libuv read callback encounters an error, it will be set in a special flag within the socket structure.

If libcouchbase has requested read-readiness notifications, the function will respond with an error (but only once all data from the read-ahead buffer has been exhausted).

Writes

Write readiness is implemented by using an internal buffer as well, and its availability is indicated by the space available in this internal buffer.

Writes are batched into a single request. When libcouchbase uses the send function, data is written to the internal write buffer (and when there is no more space, EWOULDBLOCK is returned).

The actual uv_write operation is scheduled on the first write per-event- loop-iteration. Consider a control flow like this:

    libcouchbase write-readiness callback invoked:

        while( (nw = (send(sock, data, size)) == size) );

    libcouchbase write-readiness callback returns:

        LCB-LUV checks to see if there is any data in the
        write buffer.

        If there is, it will batch it all up in a single uv_write(),
        which is triggered at the next event loop iteration.

By design, the write buffer is considered unavailable until the previous write operation has flushed. This prevents sending out massive amounts of data to a possibly overloaded node, preferring to have them timed-out and purged from the internal write buffers instead

Timers

Timers are pretty much wrapped thinly over the interface libuv provides. Nothing special here

Hooks

libcouchbase will call the event loop's run_event_loop function whenever it wants data, and stop_event_loop whenever it has completed all operations.

In synchronous event loops, these actually transfer control to and from the event loop, but in asynchronous environments are generally no-ops.

For tests and other purposes, LCB-LUV provides user-defined hooks which may be called when libcouchbase invokes these start/stop functions

Reentrancy and Reference Counting

Event loops are notoriously difficult to deal with regarding re-entrancy issues. Therefore, reference counting for socket objects has been implmented.

Additionally, libuv may 'own' a socket even while libcouchbase thinks it has been closed. In this case, a close call will trigger a deferred refcount decrement.

BUILDING

So you probably want to read this first.

In order to build LCB-LUV, you will need the source code for both libuv AND libcouchbase.

For libuv you will need a clean source tree, which should by default reside in this project's uv/ directory.

For libcouchbase, you will need a modified version with a patch I've just pushed to gerrit (in order to allow proper testing).

    http://review.couchbase.org/#change,14132

Additionally, you will need curses for the yolog logging module (this will be moved out when i've decided I don't need elaborate logging).

You should edit the Makefile's LIBCOUCHBASE_SRC to point to the location of your libcouchbase source code.

Your directory structure should look like this:

    $ ls
    common.c              read.c            uv
    lcb_luv_internal.h    README.pod        write.c
    libcouchbase-libuv.h  run_lcb_tests.sh  yolog.c
    main.c                socket.c          yolog.h
    Makefile              test
    plugin-libuv.c        timer.c

    $ ls uv/
    AUTHORS          gyp_uv     src
    build            include    test
    common.gypi      LICENSE    uv.gyp
    config-mingw.mk  Makefile   vcbuild.bat
    config-unix.mk   README.md

Running make will build the uv sources, the dynamic libuv.so and the plugin, libcouchbase-libuv.so

To run basic async mode tests, you can do make check.

This is an example of a successful invocation (mind the exit error). $ make check # ... [simple1] get_callback:30 Get callback successful [common] lcb_luv_update_event:246 Requested events 2 [test] stop_callback:15 stop_event_loop(). Will invoke next state [simple1] t00_contcb_next:68 Exiting now.. make: *** [check] Error 1

To run libcouchbase with the libuv event loop, run make check-lcb.

   $ make check-lcb

   # lots of output here.
   # ..
   ===================
   All 14 tests passed
   ===================

To run a specific test from within the libcouchbase test suite, you can set the LCB_TEST_NAME Make variable, as so:

    $ make check-lcb LCB_TEST_NAME=arithmetic-test
    Will try to use loop: libuv
    # successful termination.

To get more verbose output, set LCB_LUV_DEBUG in the environment.

Something went wrong with that request. Please try again.