Skip to content

Commit

Permalink
streaming: Enable streaming reads from a pipe.
Browse files Browse the repository at this point in the history
This adds a new read=FILENAME operation to allow streaming in the
reverse direction: from a local pipe to the NBD client.
  • Loading branch information
rwmjones committed Jul 24, 2020
1 parent 97d3cb4 commit 2aa7b35
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 44 deletions.
33 changes: 30 additions & 3 deletions plugins/streaming/nbdkit-streaming-plugin.pod
Expand Up @@ -4,7 +4,9 @@ nbdkit-streaming-plugin - nbdkit streaming plugin

=head1 SYNOPSIS

nbdkit streaming write=FILENAME [size=SIZE]
nbdkit streaming write=PIPE [size=SIZE]

nbdkit streaming read=PIPE [size=SIZE]

=head1 DESCRIPTION

Expand All @@ -27,6 +29,20 @@ This can be visualised as:
writes │ plugin │
└───────────┘

If the NBD client opens the NBD port and I<reads> from the start to
the end of the disk without seeking backwards, then you can turn a
local pipe or socket into a stream of data for that client:

nbdkit streaming read=./pipe

This can be visualised as:

┌───────────┐
plugin streams │ nbdkit │ NBD
data from ──────▶│ streaming │──────▶ client
./pipe │ plugin │ reads
└───────────┘

Note that F<./pipe> (or the local socket) sees raw data, it is not
using the NBD protocol. If you want to forward NBD to a local socket
connected to another NBD server, use L<nbdkit-nbd-plugin(1)>.
Expand Down Expand Up @@ -57,20 +73,31 @@ of order). This approach cannot work for other formats such as qcow2
since those contain metadata that must be updated by seeking back to
the start of the file which is not possible if the output is a pipe.

The reverse is to get qemu-img to read from a pipe:

nbdkit -U - streaming read=./pipe \
--run ' qemu-img convert -f raw $nbd -O qcow2 output.qcow2 '

For use of the I<--run> and I<-U -> options, see L<nbdkit-captive(1)>.

=head1 PARAMETERS

Either C<read> or C<write> is required, but not both.

=over 4

=item B<read=>FILENAME

Read data stream from the named pipe or socket. If the pipe or socket
does not exist, then it is created (as a named FIFO), otherwise the
existing pipe or socket is opened and used.

=item B<write=>FILENAME

Write data stream to the named pipe or socket. If the pipe or socket
does not exist, then it is created (as a named FIFO), otherwise the
existing pipe or socket is opened and used.

This parameter is required.

=item B<pipe=>FILENAME

For backwards compatibility this is a synonym for C<write=FILENAME>.
Expand Down
190 changes: 149 additions & 41 deletions plugins/streaming/streaming.c
Expand Up @@ -38,12 +38,16 @@
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>

#include <nbdkit-plugin.h>

/* Mode - read or write? */
static enum { UNKNOWN_MODE, READ_MODE, WRITE_MODE } mode = UNKNOWN_MODE;

/* The pipe. */
static char *filename = NULL;
static int fd = -1;
Expand All @@ -58,8 +62,8 @@ static int64_t size = INT64_C(9223372036854775296);
*/
static bool errorstate = 0;

/* Highest byte (+1) that has been written in the data stream. */
static uint64_t highestwrite = 0;
/* Highest byte (+1) that has been accessed in the data stream. */
static uint64_t highest = 0;

static void
streaming_unload (void)
Expand All @@ -75,7 +79,20 @@ streaming_config (const char *key, const char *value)
{
if (strcmp (key, "write") == 0 ||
strcmp (key, "pipe") == 0) {
/* See FILENAMES AND PATHS in nbdkit-plugin(3). */
if (mode != UNKNOWN_MODE) {
nbdkit_error ("you cannot use read and write options at the same time");
return -1;
}
mode = WRITE_MODE;
goto adjust_filename;
}
else if (strcmp (key, "read") == 0) {
if (mode != UNKNOWN_MODE) {
nbdkit_error ("you cannot use read and write options at the same time");
return -1;
}
mode = READ_MODE;
adjust_filename:
filename = nbdkit_absolute_path (value);
if (!filename)
return -1;
Expand All @@ -93,27 +110,45 @@ streaming_config (const char *key, const char *value)
return 0;
}

/* Check the user did pass a write=<FILENAME> parameter. */
/* Did the user pass either the read or write parameter? */
static int
streaming_config_complete (void)
{
if (filename == NULL) {
nbdkit_error ("you must supply the write=<FILENAME> parameter "
if (mode == UNKNOWN_MODE) {
nbdkit_error ("you must supply either the read=<FILENAME> or "
"write=<FILENAME> parameter "
"after the plugin name on the command line");
return -1;
}

return 0;
}

#define streaming_config_help \
"read=<FILENAME> The pipe or socket to read.\n" \
"write=<FILENAME> The pipe or socket to write.\n" \
"size=<SIZE> (optional) Stream size."

static int
streaming_get_ready (void)
{
int flags;

assert (mode != UNKNOWN_MODE);
assert (filename != NULL);
assert (fd == -1);

flags = O_CLOEXEC|O_NOCTTY;
if (mode == WRITE_MODE)
flags |= O_RDWR;
else
flags |= O_RDONLY;

/* Open the file blindly. If this fails with ENOENT then we create a
* FIFO and try again.
*/
again:
fd = open (filename, O_RDWR|O_CLOEXEC|O_NOCTTY);
fd = open (filename, flags);
if (fd == -1) {
if (errno != ENOENT) {
nbdkit_error ("open: %s: %m", filename);
Expand All @@ -129,10 +164,6 @@ streaming_get_ready (void)
return 0;
}

#define streaming_config_help \
"write=<FILENAME> (required) The filename to serve.\n" \
"size=<SIZE> (optional) Stream size."

/* Create the per-connection handle. */
static void *
streaming_open (int readonly)
Expand All @@ -151,6 +182,16 @@ streaming_open (int readonly)
return NBDKIT_HANDLE_NOT_NEEDED;
}

/* In write mode, writes are allowed. In read mode, we act as if -r
* was passed on the command line and the client will not be allowed
* to write.
*/
static int
streaming_can_write (void *h)
{
return mode == WRITE_MODE;
}

#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS

/* Return the size of the stream (infinite). */
Expand All @@ -160,6 +201,90 @@ streaming_get_size (void *handle)
return size;
}

/* Read data back from the stream. */
static int
streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
{
size_t n;
ssize_t r;

if (errorstate) {
nbdkit_error ("unrecoverable error state");
errno = EIO;
return -1;
}

if (mode == READ_MODE) {
if (offset < highest) {
nbdkit_error ("client tried to seek backwards and read: "
"the streaming plugin does not support this");
errorstate = true;
errno = EIO;
return -1;
}

/* If the offset is higher than previously read we must seek
* forwards and discard data.
*/
if (offset > highest) {
int64_t remaining = offset - highest;
static char discard[4096];

while (remaining > 0) {
n = remaining > sizeof discard ? sizeof discard : remaining;
r = read (fd, discard, n);
if (r == -1) {
nbdkit_error ("read: %m");
errorstate = true;
return -1;
}
if (r == 0) {
nbdkit_error ("read: unexpected end of file reading from the pipe");
errorstate = true;
return -1;
}
highest += r;
remaining -= r;
}
}

/* Read data from the pipe into the return buffer. */
while (count > 0) {
r = read (fd, buf, count);
if (r == -1) {
nbdkit_error ("read: %m");
errorstate = true;
return -1;
}
if (r == 0) {
nbdkit_error ("read: unexpected end of file reading from the pipe");
errorstate = true;
return -1;
}
buf += r;
highest += r;
count -= r;
}

return 0;
}

/* WRITE_MODE */
else {
/* Allow reads which are entirely >= highest. These return zeroes. */
if (offset >= highest) {
memset (buf, 0, count);
return 0;
}

nbdkit_error ("client tried to read, but the streaming plugin is "
"being used in write mode (write= parameter)");
errorstate = true;
errno = EIO;
return -1;
}
}

/* Write data to the stream. */
static int
streaming_pwrite (void *handle, const void *buf,
Expand All @@ -168,23 +293,28 @@ streaming_pwrite (void *handle, const void *buf,
size_t n;
ssize_t r;

/* This can never happen because streaming_can_write above returns
* false in read mode.
*/
assert (mode == WRITE_MODE);

if (errorstate) {
nbdkit_error ("unrecoverable error state");
errno = EIO;
return -1;
}

if (offset < highestwrite) {
if (offset < highest) {
nbdkit_error ("client tried to seek backwards and write: "
"the streaming plugin does not currently support this");
"the streaming plugin does not support this");
errorstate = true;
errno = EIO;
return -1;
}

/* Need to write some zeroes. */
if (offset > highestwrite) {
int64_t remaining = offset - highestwrite;
if (offset > highest) {
int64_t remaining = offset - highest;
static char zerobuf[4096];

while (remaining > 0) {
Expand All @@ -195,7 +325,7 @@ streaming_pwrite (void *handle, const void *buf,
errorstate = true;
return -1;
}
highestwrite += r;
highest += r;
remaining -= r;
}
}
Expand All @@ -209,36 +339,13 @@ streaming_pwrite (void *handle, const void *buf,
return -1;
}
buf += r;
highestwrite += r;
highest += r;
count -= r;
}

return 0;
}

/* Read data back from the stream. */
static int
streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
{
if (errorstate) {
nbdkit_error ("unrecoverable error state");
errno = EIO;
return -1;
}

/* Allow reads which are entirely >= highestwrite. These return zeroes. */
if (offset >= highestwrite) {
memset (buf, 0, count);
return 0;
}

nbdkit_error ("client tried to read: "
"the streaming plugin does not currently support this");
errorstate = 1;
errno = EIO;
return -1;
}

static struct nbdkit_plugin plugin = {
.name = "streaming",
.longname = "nbdkit streaming plugin",
Expand All @@ -249,9 +356,10 @@ static struct nbdkit_plugin plugin = {
.config_help = streaming_config_help,
.get_ready = streaming_get_ready,
.open = streaming_open,
.can_write = streaming_can_write,
.get_size = streaming_get_size,
.pwrite = streaming_pwrite,
.pread = streaming_pread,
.pwrite = streaming_pwrite,
.errno_is_preserved = 1,
};

Expand Down
2 changes: 2 additions & 0 deletions tests/Makefile.am
Expand Up @@ -864,9 +864,11 @@ endif HAVE_SSH
# streaming plugin test.
LIBNBD_TESTS += test-streaming
TESTS += \
test-streaming-qemu-read.sh \
test-streaming-qemu-write.sh \
$(NULL)
EXTRA_DIST += \
test-streaming-qemu-read.sh \
test-streaming-qemu-write.sh \
$(NULL)

Expand Down

0 comments on commit 2aa7b35

Please sign in to comment.