Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #6 from xen-org/trunk-ring3

Storage migration code
  • Loading branch information...
commit 230797aa6c063e4065f241213a741e2068c8725e 2 parents 4a556d4 + d38b32a
Andrei Lifchits andreil authored
12 control/tap-ctl-unpause.c
View
@@ -40,7 +40,7 @@
#include "tap-ctl.h"
int
-tap_ctl_unpause(const int id, const int minor, const char *params)
+tap_ctl_unpause(const int id, const int minor, const char *params, int flags, char *secondary)
{
int err;
tapdisk_message_t message;
@@ -48,10 +48,20 @@ tap_ctl_unpause(const int id, const int minor, const char *params)
memset(&message, 0, sizeof(message));
message.type = TAPDISK_MESSAGE_RESUME;
message.cookie = minor;
+ message.u.params.flags = flags;
if (params)
strncpy(message.u.params.path, params,
sizeof(message.u.params.path) - 1);
+ if (secondary) {
+ err = snprintf(message.u.params.secondary,
+ sizeof(message.u.params.secondary) - 1, "%s",
+ secondary);
+ if (err >= sizeof(message.u.params.secondary)) {
+ EPRINTF("secondary image name too long\n");
+ return ENAMETOOLONG;
+ }
+ }
err = tap_ctl_connect_send_and_receive(id, &message, NULL);
if (err)
15 control/tap-ctl.c
View
@@ -609,21 +609,24 @@ tap_cli_pause(int argc, char **argv)
static void
tap_cli_unpause_usage(FILE *stream)
{
- fprintf(stream, "usage: unpause <-p pid> <-m minor> [-a args]\n");
+ fprintf(stream, "usage: unpause <-p pid> <-m minor> [-a args] [-2 secondary]\n");
}
int
tap_cli_unpause(int argc, char **argv)
{
const char *args;
- int c, pid, minor;
+ char *secondary;
+ int c, pid, minor, flags;
pid = -1;
minor = -1;
args = NULL;
+ secondary = NULL;
+ flags = 0;
optind = 0;
- while ((c = getopt(argc, argv, "p:m:a:h")) != -1) {
+ while ((c = getopt(argc, argv, "p:m:a:2:h")) != -1) {
switch (c) {
case 'p':
pid = atoi(optarg);
@@ -634,6 +637,10 @@ tap_cli_unpause(int argc, char **argv)
case 'a':
args = optarg;
break;
+ case '2':
+ flags |= TAPDISK_MESSAGE_FLAG_SECONDARY;
+ secondary = optarg;
+ break;
case '?':
goto usage;
case 'h':
@@ -645,7 +652,7 @@ tap_cli_unpause(int argc, char **argv)
if (pid == -1 || minor == -1)
goto usage;
- return tap_ctl_unpause(pid, minor, args);
+ return tap_ctl_unpause(pid, minor, args, flags, secondary);
usage:
tap_cli_unpause_usage(stderr);
7 drivers/Makefile.am
View
@@ -32,6 +32,9 @@ libtapdisk_la_SOURCES += tapdisk-vbd.h
libtapdisk_la_SOURCES += linux-blktap.h
libtapdisk_la_SOURCES += tapdisk-blktap.c
libtapdisk_la_SOURCES += tapdisk-blktap.h
+libtapdisk_la_SOURCES += tapdisk-nbdserver.c
+libtapdisk_la_SOURCES += tapdisk-nbdserver.h
+libtapdisk_la_SOURCES += tapdisk-nbd.h
libtapdisk_la_SOURCES += tapdisk-image.c
libtapdisk_la_SOURCES += tapdisk-image.h
libtapdisk_la_SOURCES += tapdisk-driver.c
@@ -67,6 +70,8 @@ libtapdisk_la_SOURCES += lock.c
libtapdisk_la_SOURCES += lock.h
libtapdisk_la_SOURCES += atomicio.c
libtapdisk_la_SOURCES += atomicio.h
+libtapdisk_la_SOURCES += tapdisk-fdreceiver.c
+libtapdisk_la_SOURCES += tapdisk-fdreceiver.h
libtapdisk_la_SOURCES += block-aio.c
libtapdisk_la_SOURCES += block-ram.c
@@ -77,6 +82,8 @@ libtapdisk_la_SOURCES += block-valve.h
libtapdisk_la_SOURCES += block-vindex.c
libtapdisk_la_SOURCES += block-lcache.c
libtapdisk_la_SOURCES += block-llcache.c
+libtapdisk_la_SOURCES += block-export.c
+libtapdisk_la_SOURCES += block-nbd.c
libtapdisk_la_LIBADD = ../vhd/lib/libvhd.la
libtapdisk_la_LIBADD += -laio
354 drivers/block-export.c
View
@@ -0,0 +1,354 @@
+#include <errno.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/types.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include "tapdisk.h"
+#include "tapdisk-server.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-interface.h"
+
+#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "export: " _f, ##_a)
+#define ERROR(_f, _a...) tlog_syslog(TLOG_WARN, "export: " _f, ##_a)
+
+struct vdi_chunk_format {
+ uint64_t offset;
+ uint32_t length;
+ char payload[0];
+} __attribute__((__packed__));
+
+struct vdi_chunk {
+ unsigned long bytes_to_send;
+ unsigned long bytes_sent;
+ int send_terminator;
+ struct vdi_chunk_format chunk;
+};
+
+struct tdexport_data {
+ /* Socket details */
+ int socket;
+ struct sockaddr_in *remote;
+ char *peer_ip;
+
+ /* HTTP import protocol details */
+ char *session_ref;
+ char *vdi_ref;
+};
+
+int tdexport_connect_import_session(struct tdexport_data *prv)
+{
+ int sock;
+ int opt = 1;
+ int rc;
+ char *msg;
+
+ sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (sock < 0) {
+ ERROR("Could not create socket: %s\n", strerror(errno));
+ return -1;
+ }
+
+ rc = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt));
+ if (rc < 0) {
+ ERROR("Could not set TCP_NODELAY: %s\n", strerror(errno));
+ return -1;
+ }
+
+ prv->remote = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in *));
+ if (!prv->remote) {
+ ERROR("struct sockaddr_in malloc failure\n");
+ close(sock);
+ return -1;
+ }
+ prv->remote->sin_family = AF_INET;
+ rc = inet_pton(AF_INET, prv->peer_ip, &(prv->remote->sin_addr.s_addr));
+ if (rc < 0) {
+ ERROR("Could not create inaddr: %s\n", strerror(errno));
+ free(prv->remote);
+ prv->remote = NULL;
+ close(sock);
+ return -1;
+ }
+ else if (rc == 0) {
+ ERROR("inet_pton parse error\n");
+ free(prv->remote);
+ prv->remote = NULL;
+ close(sock);
+ return -1;
+ }
+ prv->remote->sin_port = htons(80);
+
+ if (connect(sock, (struct sockaddr *)prv->remote, sizeof(struct sockaddr)) < 0) {
+ ERROR("Could not connect to peer: %s\n", strerror(errno));
+ close(sock);
+ return -1;
+ }
+
+#define IMPORT_URL_FMT "PUT /import_raw_vdi?session_id=%s&vdi=%s&chunked=true&o_direct=true HTTP/1.0\r\n\r\n"
+ msg = malloc(strlen(IMPORT_URL_FMT) +
+ strlen(prv->session_ref) +
+ strlen(prv->vdi_ref) +1 );
+ if (!msg) {
+ ERROR("HTTP PUT message malloc failure\n");
+ close(sock);
+ return -1;
+ }
+ sprintf(msg, IMPORT_URL_FMT, prv->session_ref, prv->vdi_ref);
+ INFO("Using query: %s\n", msg);
+
+ rc = send(sock, msg, strlen(msg), 0);
+ if (rc == -1) {
+ ERROR("Error sending PUT request: %s\n", strerror(errno));
+ close(sock);
+ free(msg);
+ return -1;
+ }
+ free(msg);
+
+ /* Wait for HTTP/1.1 200 OK and the double \r\n */
+#define RECV_BUFFER_SIZE 4096
+ msg = malloc(RECV_BUFFER_SIZE);
+ if (!msg) {
+ ERROR("Receive buffer malloc failure\n");
+ close(sock);
+ return -1;
+ }
+ for (;;) {
+ rc = recv(sock, msg, RECV_BUFFER_SIZE, 0);
+ if ( rc < 0) {
+ ERROR("Error receiving from peer: %s", strerror(errno));
+ free(msg);
+ close(sock);
+ return -1;
+ }
+ if (strstr(msg, "HTTP/1.1 200 OK")) {
+ INFO("Received from peer: HTTP/1.1 200 OK\n");
+ }
+ if (strstr(msg, "\r\n\r\n")) {
+ break;
+ }
+ }
+ free(msg);
+
+ prv->socket = sock;
+ return sock;
+}
+
+int tdexport_send_chunk(struct tdexport_data *prv, const char *buffer, uint32_t length, uint64_t offset)
+{
+ struct vdi_chunk_format chunk;
+ ssize_t rc;
+ uint32_t bytes_to_send;
+ uint32_t bytes_sent;
+
+ INFO("Sending chunk of size %u at offset %"PRIu64"\n", length, offset);
+ if (prv->socket == 0) {
+ ERROR("Cannot send chunk without an open socket\n");
+ return -1;
+ }
+
+ chunk.length = length;
+ chunk.offset = offset;
+ rc = send(prv->socket, &chunk, sizeof(chunk), 0);
+ if (rc < 0) {
+ ERROR("Error sending chunk header: %s\n", strerror(errno));
+ return -1;
+ }
+ if (rc != sizeof(chunk)) {
+ ERROR("Send of %d bytes of chunk header does not match expected %d\n", (int)rc, (int)sizeof(chunk));
+ }
+
+ bytes_to_send = length;
+ bytes_sent = 0;
+
+ while(bytes_to_send) {
+ rc = send(prv->socket, buffer + bytes_sent, bytes_to_send, 0);
+ if (rc < 0) {
+ ERROR("Error sending chunk payload: %s\n", strerror(errno));
+ return -1;
+ }
+ if ((uint32_t)rc > bytes_to_send)
+ rc = bytes_to_send;
+ bytes_sent += (uint32_t)rc;
+ bytes_to_send -= (uint32_t)rc;
+ }
+
+ return 0;
+}
+
+int tdexport_recv_result(struct tdexport_data *prv)
+{
+ uint32_t result;
+ ssize_t rc;
+
+ if (prv->socket == 0) {
+ ERROR("Cannot receive result an open socket\n");
+ return -1;
+ }
+ rc = recv(prv->socket, &result, sizeof(result), 0);
+ if (rc < 0) {
+ ERROR("Error receiving result: %s\n", strerror(errno));
+ return -1;
+ }
+ if (result != 0) {
+ ERROR("Received non-zero result: %d (mirror is out-of-sync)", result);
+ return -1;
+ }
+
+ INFO("Received successful result; mirror is synchronised OK\n");
+
+ return 0;
+}
+
+int tdexport_send_terminator_chunk(struct tdexport_data *prv)
+{
+ struct vdi_chunk_format zero;
+ ssize_t rc;
+
+ if (prv->socket == 0) {
+ ERROR("Cannot send terminator chunk without an open socket\n");
+ return -1;
+ }
+
+ zero.offset = 0ULL;
+ zero.length = 0UL;
+
+ rc = send(prv->socket, &zero, sizeof(zero), 0);
+ if (rc < 0) {
+ ERROR("Error sending chunk terminator: %s\n", strerror(errno));
+ return -1;
+ }
+ if (rc != sizeof(zero)) {
+ ERROR("Send of %d bytes of terminator does not match expected %u\n", (int)rc, (int)sizeof(zero));
+ return -1;
+ }
+ INFO("Terminator chunk written successfully\n");
+ return 0;
+}
+
+/* -- interface -- */
+
+static int tdexport_close(td_driver_t*);
+
+static int tdexport_open(td_driver_t* driver, const char* name, td_flag_t flags)
+{
+ struct tdexport_data *prv;
+ char peer_ip[256];
+ char session_ref[256];
+ char vdi_ref[256];
+ int rc;
+
+ driver->info.sector_size = 512;
+ driver->info.info = 0;
+
+ prv = (struct tdexport_data *)driver->data;
+ memset(prv, 0, sizeof(struct tdexport_data));
+
+ INFO("Opening export to %s\n", name);
+
+ rc = sscanf(name, "%255[^/]/%255[^/]/%255[^\n]", peer_ip, session_ref, vdi_ref);
+ if (rc != 3) {
+ ERROR("Could not parse export URL");
+ return -1;
+ }
+ prv->peer_ip = malloc(strlen(peer_ip) + 1);
+ prv->session_ref = malloc(strlen(session_ref) + 1);
+ prv->vdi_ref = malloc(strlen(vdi_ref) + 1);
+ if (!prv->peer_ip || !prv->session_ref || !prv->vdi_ref) {
+ ERROR("Failure to malloc for URL parts");
+ return -1;
+ }
+ strcpy(prv->peer_ip, peer_ip);
+ strcpy(prv->session_ref, session_ref);
+ strcpy(prv->vdi_ref, vdi_ref);
+
+ INFO("Export peer=%s session=%s VDI=%s\n", prv->peer_ip, prv->session_ref, prv->vdi_ref);
+ if (tdexport_connect_import_session(prv) < 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static int tdexport_close(td_driver_t* driver)
+{
+ struct tdexport_data *prv;
+
+ prv = (struct tdexport_data *)driver->data;
+ if (prv->socket) {
+ tdexport_send_terminator_chunk(prv);
+ tdexport_recv_result(prv);
+ close(prv->socket);
+ prv->socket = 0;
+ }
+
+ if (prv->peer_ip) {
+ free(prv->peer_ip);
+ prv->peer_ip = NULL;
+ }
+ if (prv->session_ref) {
+ free(prv->session_ref);
+ prv->session_ref = NULL;
+ }
+ if (prv->vdi_ref) {
+ free(prv->vdi_ref);
+ prv->vdi_ref = NULL;
+ }
+
+ return 0;
+}
+
+static void tdexport_queue_read(td_driver_t* driver, td_request_t treq)
+{
+ int size = treq.secs * driver->info.sector_size;
+ uint64_t offset = treq.sec * (uint64_t)driver->info.sector_size;
+ INFO("READ %"PRIu64" (%d)\n", offset, size);
+ td_forward_request(treq);
+ //td_complete_request(treq, 0);
+}
+
+static void tdexport_queue_write(td_driver_t* driver, td_request_t treq)
+{
+ struct tdexport_data *prv = (struct tdexport_data *)driver->data;
+ int size = treq.secs * driver->info.sector_size;
+ uint64_t offset = treq.sec * (uint64_t)driver->info.sector_size;
+
+ //memcpy(img + offset, treq.buf, size);
+
+ INFO("WRITE 0x%"PRIu64" (%u)\n", offset, size);
+
+ tdexport_send_chunk(prv, treq.buf, size, offset);
+
+ td_complete_request(treq, 0);
+}
+
+static int tdexport_get_parent_id(td_driver_t* driver, td_disk_id_t* id)
+{
+ return -EINVAL;
+}
+
+static int tdexport_validate_parent(td_driver_t *driver,
+ td_driver_t *parent, td_flag_t flags)
+{
+ return 0;
+}
+
+struct tap_disk tapdisk_export = {
+ .disk_type = "tapdisk_export",
+ .private_data_size = sizeof(struct tdexport_data),
+ .flags = 0,
+ .td_open = tdexport_open,
+ .td_close = tdexport_close,
+ .td_queue_read = tdexport_queue_read,
+ .td_queue_write = tdexport_queue_write,
+ .td_get_parent_id = tdexport_get_parent_id,
+ .td_validate_parent = tdexport_validate_parent,
+};
847 drivers/block-nbd.c
View
@@ -0,0 +1,847 @@
+#include <errno.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/types.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include "tapdisk.h"
+#include "tapdisk-server.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-interface.h"
+#include "tapdisk-utils.h"
+#include "tapdisk-fdreceiver.h"
+#include "tapdisk-nbd.h"
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "nbd: " _f, ##_a)
+#define ERROR(_f, _a...) tlog_syslog(TLOG_WARN, "nbd: " _f, ##_a)
+
+#define N_PASSED_FDS 10
+#define TAPDISK_NBDCLIENT_MAX_PATH_LEN 256
+#define TAPDISK_NBDCLIENT_LISTENING_SOCK_PATH "/var/run/blktap-control/nbdclient"
+#define MAX_NBD_REQS TAPDISK_DATA_REQUESTS
+#define NBD_TIMEOUT 30
+
+/* We'll only ever have one nbdclient fd receiver per tapdisk process,
+ so let's just store it here globally. We'll also keep track of the
+ passed fds here too. */
+
+struct td_fdreceiver *fdreceiver=NULL;
+
+struct tdnbd_passed_fd {
+ char id[40];
+ struct timeval t;
+ int fd;
+} passed_fds[N_PASSED_FDS];
+
+
+struct nbd_queued_io {
+ char *buffer;
+ int len;
+ int so_far;
+};
+
+struct td_nbd_request {
+ td_request_t treq;
+ struct nbd_request nreq;
+ int timeout_event;
+ int fake;
+ struct nbd_queued_io header; /* points to the request struct above */
+ struct nbd_queued_io body; /* in or out, depending on whether type is read or write */
+ struct list_head queue;
+};
+
+struct tdnbd_data
+{
+ int writer_event_id;
+ struct list_head sent_reqs;
+ struct list_head pending_reqs;
+ struct list_head free_reqs;
+ struct td_nbd_request requests[MAX_NBD_REQS];
+ int nr_free_count;
+
+ int reader_event_id;
+ struct nbd_reply current_reply;
+ struct nbd_queued_io cur_reply_qio;
+ struct td_nbd_request *current_reply_request;
+
+ int socket;
+ struct sockaddr_in *remote;
+ char *peer_ip;
+ int port;
+ char *name;
+
+ int flags;
+ int closed;
+};
+
+int global_id=0;
+
+void disable_write_queue(struct tdnbd_data *prv);
+
+
+
+/* -- fdreceiver bits and pieces -- */
+
+void
+tdnbd_stash_passed_fd(int fd, char *msg, void *data)
+{
+ int free_index=-1;
+ int i;
+ for(i=0; i<N_PASSED_FDS; i++) {
+ if(passed_fds[i].fd == -1) {
+ free_index=i;
+ break;
+ }
+ }
+
+ if(free_index==-1) {
+ ERROR("Error - more than %d fds passed! cannot stash another.",N_PASSED_FDS);
+ close(fd);
+ return;
+ }
+
+ passed_fds[free_index].fd=fd;
+ strncpy(passed_fds[free_index].id, msg, sizeof(passed_fds[free_index].id));
+ gettimeofday(&passed_fds[free_index].t, NULL);
+
+}
+
+int tdnbd_retreive_passed_fd(const char *name)
+{
+ int fd, i;
+
+ for(i=0; i<N_PASSED_FDS; i++) {
+ if(strncmp(name, passed_fds[i].id,sizeof(passed_fds[i].id))==0) {
+ fd=passed_fds[i].fd;
+ passed_fds[i].fd = -1;
+ return fd;
+ }
+ }
+
+ ERROR("Couldn't find the fd named: %s",name);
+
+ return -1;
+}
+
+void
+tdnbd_fdreceiver_start()
+{
+ char fdreceiver_path[TAPDISK_NBDCLIENT_MAX_PATH_LEN];
+ int i;
+
+ /* initialise the passed fds list */
+ for(i=0; i<N_PASSED_FDS; i++) {
+ passed_fds[i].fd = -1;
+ }
+
+ snprintf(fdreceiver_path, TAPDISK_NBDCLIENT_MAX_PATH_LEN,
+ "%s%d", TAPDISK_NBDCLIENT_LISTENING_SOCK_PATH, getpid());
+
+ fdreceiver=td_fdreceiver_start(fdreceiver_path,
+ tdnbd_stash_passed_fd, NULL);
+
+}
+
+
+void __cancel_req(int i, struct td_nbd_request *pos, int e)
+{
+ char handle[9];
+ memcpy(handle,pos->nreq.handle,8);
+ handle[8]=0;
+ INFO("Entry %d: handle='%s' type=%d -- reporting errno: %d",i,handle,ntohl(pos->nreq.type), e);
+
+ if(pos->timeout_event >= 0) {
+ tapdisk_server_unregister_event(pos->timeout_event);
+ pos->timeout_event = -1;
+ }
+
+ td_complete_request(pos->treq, e);
+}
+
+void tdnbd_disable(struct tdnbd_data *prv, int e)
+{
+ struct td_nbd_request *pos, *q;
+ int i=0;
+
+ INFO("NBD client full-disable");
+
+ tapdisk_server_unregister_event(prv->writer_event_id);
+ tapdisk_server_unregister_event(prv->reader_event_id);
+
+ list_for_each_entry_safe(pos, q, &prv->sent_reqs, queue) {
+ __cancel_req(i++,pos,e);
+ }
+
+ list_for_each_entry_safe(pos, q, &prv->pending_reqs, queue) {
+ __cancel_req(i++,pos,e);
+ }
+
+ INFO("Setting closed");
+ prv->closed = 3;
+}
+
+/* NBD writer queue */
+
+
+/* Return code: how much is left to write, or a negative error code */
+int tdnbd_write_some(int fd, struct nbd_queued_io *data)
+{
+ int left = data->len - data->so_far;
+ int rc;
+ char *code;
+
+ while(left > 0) {
+ rc = send(fd, data->buffer + data->so_far, left, 0);
+
+ if(rc == -1) {
+
+ if((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+ return left;
+ }
+
+ code = strerror(errno);
+ ERROR("Bad return code %d from send (%s)",rc,(code==0 ? "unknown" : code));
+ return rc;
+ }
+
+ if(rc == 0) {
+ ERROR("Server shutdown prematurely in write_some");
+ return -1;
+ }
+
+ left -= rc;
+ data->so_far += rc;
+ }
+
+ return left;
+}
+
+int tdnbd_read_some(int fd, struct nbd_queued_io *data)
+{
+ int left = data->len - data->so_far;
+ int rc;
+ char *code;
+
+ while(left > 0) {
+ rc = recv(fd, data->buffer + data->so_far, left, 0);
+
+ if(rc == -1) {
+
+ if((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
+ return left;
+ }
+
+ code = strerror(errno);
+ ERROR("Bad return code %d from send (%s)",rc,(code==0 ? "unknown" : code));
+ return rc;
+ }
+
+ if(rc == 0) {
+ ERROR("Server shutdown prematurely in read_some");
+ return -1;
+ }
+
+ data->so_far += rc;
+ left -= rc;
+ }
+
+ return left;
+}
+
+void tdnbd_timeout_cb(event_id_t eb, char mode, void *data)
+{
+ struct tdnbd_data *prv=data;
+
+ ERROR("Timeout!: %d",eb);
+
+ tdnbd_disable(prv, ETIMEDOUT);
+}
+
+void tdnbd_writer_cb(event_id_t eb, char mode, void *data)
+{
+ struct td_nbd_request *pos, *q;
+
+ struct tdnbd_data *prv=data;
+
+ list_for_each_entry_safe(pos, q, &prv->pending_reqs, queue) {
+ if(tdnbd_write_some(prv->socket, &pos->header)>0)
+ return;
+ if(ntohl(pos->nreq.type)==NBD_CMD_WRITE) {
+ if(tdnbd_write_some(prv->socket, &pos->body)>0)
+ return;
+ }
+ if(ntohl(pos->nreq.type)==NBD_CMD_DISC) {
+ INFO("sent close request");
+ /* We don't expect a response from a DISC, so move the request back onto the free list */
+ list_move(&pos->queue, &prv->free_reqs);
+ prv->nr_free_count++;
+ prv->closed=2;
+ } else {
+ list_move(&pos->queue, &prv->sent_reqs);
+ }
+ }
+
+ /* If we're here, we've written everything */
+
+ disable_write_queue(prv);
+
+ if(prv->closed==2) {
+ tdnbd_disable(prv,EIO);
+ }
+
+ return;
+}
+
+int enable_write_queue(struct tdnbd_data *prv)
+{
+ if(prv->writer_event_id >= 0)
+ return 0;
+
+ prv->writer_event_id =
+ tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+ prv->socket,
+ 0,
+ tdnbd_writer_cb,
+ prv);
+
+ return prv->writer_event_id;
+}
+
+void disable_write_queue(struct tdnbd_data *prv)
+{
+ if(prv->writer_event_id < 0)
+ return;
+
+ tapdisk_server_unregister_event(prv->writer_event_id);
+
+ prv->writer_event_id = -1;
+}
+
+int tdnbd_queue_request(struct tdnbd_data *prv, int type, uint64_t offset,
+ char *buffer, uint32_t length, td_request_t treq, int fake)
+{
+ if(prv->nr_free_count==0)
+ return -EBUSY;
+
+ if(prv->closed==3) {
+ td_complete_request(treq, -ETIMEDOUT);
+ return -ETIMEDOUT;
+ }
+
+ struct td_nbd_request *req=list_entry(prv->free_reqs.next, struct td_nbd_request, queue);
+
+ /* fill in the request */
+
+ req->treq=treq;
+ int id=global_id++;
+ snprintf(req->nreq.handle, 8, "td%05x", id % 0xffff);
+
+ /* No response from a disconnect, so no need for a timeout */
+ if(type != NBD_CMD_DISC) {
+ req->timeout_event=tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ -1, /* dummy */
+ NBD_TIMEOUT,
+ tdnbd_timeout_cb,
+ prv);
+ } else {
+ req->timeout_event = -1;
+ }
+
+ INFO("request: %s timeout %d",req->nreq.handle, req->timeout_event);
+
+ req->nreq.magic = htonl(NBD_REQUEST_MAGIC);
+ req->nreq.type = htonl(type);
+ req->nreq.from = htonll(offset);
+ req->nreq.len = htonl(length);
+ req->header.buffer = (char *)&req->nreq;
+ req->header.len = sizeof(req->nreq);
+ req->header.so_far = 0;
+ req->body.buffer = buffer;
+ req->body.len = length;
+ req->body.so_far = 0;
+ req->fake=fake;
+
+ list_move_tail(&req->queue, &prv->pending_reqs);
+ prv->nr_free_count--;
+
+ if(prv->writer_event_id < 0) {
+ enable_write_queue(prv);
+ }
+ return 0;
+}
+
+/* NBD Reader callback */
+
+void tdnbd_reader_cb(event_id_t eb, char mode, void *data)
+{
+ char handle[9];
+ int do_disable=0;
+
+ /* Check to see if we're in the middle of reading a response already */
+
+ struct tdnbd_data *prv=data;
+
+ int rc = tdnbd_read_some(prv->socket, &prv->cur_reply_qio);
+
+ if(rc<0) {
+ ERROR("Error reading reply header: %d",rc);
+ tdnbd_disable(prv, EIO);
+ return;
+ }
+
+ if(rc>0) {
+ return; /* need more data */
+ }
+
+ /* Got a header. */
+
+ if(prv->current_reply.error != 0) {
+ ERROR("Error in reply: %d",prv->current_reply.error);
+ tdnbd_disable(prv, EIO);
+ return;
+ }
+
+ /* Have we found the request yet? */
+
+ if(prv->current_reply_request==NULL) {
+ struct td_nbd_request *pos, *q;
+ list_for_each_entry_safe(pos, q, &prv->sent_reqs, queue) {
+ if(memcmp(pos->nreq.handle, prv->current_reply.handle, 8)==0) {
+ prv->current_reply_request=pos;
+ break;
+ }
+ }
+
+ if(prv->current_reply_request==NULL) {
+ memcpy(handle,prv->current_reply.handle,8);
+ handle[8]=0;
+
+ ERROR("Couldn't find request corresponding to reply (reply handle='%s')",handle);
+ tdnbd_disable(prv, EIO);
+ return;
+ }
+ }
+
+ switch(ntohl(prv->current_reply_request->nreq.type)) {
+ case NBD_CMD_READ:
+ rc = tdnbd_read_some(prv->socket, &prv->current_reply_request->body);
+
+ if(rc<0) {
+ ERROR("Error reading body of request: %d",rc);
+ tdnbd_disable(prv, EIO);
+ return;
+ }
+
+ if(rc>0) {
+ return; /* need more data */
+ }
+
+ td_complete_request(prv->current_reply_request->treq, 0);
+
+ break;
+ case NBD_CMD_WRITE:
+ td_complete_request(prv->current_reply_request->treq, 0);
+
+ break;
+
+ default:
+ ERROR("Unhandled request response: %d",ntohl(prv->current_reply_request->nreq.type));
+ do_disable=1;
+ return;
+ }
+
+ /* remove the state */
+
+ list_move(&prv->current_reply_request->queue, &prv->free_reqs);
+ prv->nr_free_count++;
+
+ prv->cur_reply_qio.so_far=0;
+ if(prv->current_reply_request->timeout_event >= 0) {
+ tapdisk_server_unregister_event(prv->current_reply_request->timeout_event);
+ }
+
+ prv->current_reply_request=NULL;
+
+ /* Nb, do this here otherwise we cancel the request that has just been moved */
+
+ if(do_disable)
+ tdnbd_disable(prv, EIO);
+
+
+}
+
+int tdnbd_wait_read(int fd)
+{
+ struct timeval select_tv;
+ fd_set socks;
+ int rc;
+
+ FD_ZERO(&socks);
+ FD_SET(fd, &socks);
+ select_tv.tv_sec=10;
+ select_tv.tv_usec=0;
+ rc = select(fd+1, &socks, NULL, NULL, &select_tv);
+ return rc;
+}
+
+int tdnbd_nbd_negotiate(struct tdnbd_data *prv, td_driver_t *driver)
+{
+#define RECV_BUFFER_SIZE 256
+ int rc;
+ char buffer[RECV_BUFFER_SIZE];
+ uint64_t magic;
+ uint64_t size;
+ uint32_t flags;
+ int padbytes = 124;
+ int sock = prv->socket;
+
+
+ /* NBD negotiation protocol:
+ *
+ * Server sends 'NBDMAGIC'
+ * then it sends 0x00420281861253L
+ * then it sends a 64 bit bigendian size
+ * then it sends a 32 bit bigendian flags
+ * then it sends 124 bytes of nothing
+ *
+ */
+
+
+ /* We need to limit the time we spend in this function
+ as we're still using blocking IO at this point */
+
+ if(tdnbd_wait_read(sock) <= 0) {
+ ERROR("Timeout in nbd_negotiate");
+ close(sock);
+ return -1;
+ }
+
+ rc = recv(sock, buffer, 8, 0);
+ if(rc<8) {
+ ERROR("Short read in negotiation(1) (%d)\n",rc);
+ close(sock);
+ return -1;
+ }
+
+ if(memcmp(buffer, "NBDMAGIC", 8) != 0) {
+ buffer[8]=0;
+ ERROR("Error in NBD negotiation: got '%s'",buffer);
+ close(sock);
+ return -1;
+ }
+
+ if(tdnbd_wait_read(sock) <= 0) {
+ ERROR("Timeout in nbd_negotiate");
+ close(sock);
+ return -1;
+ }
+
+ rc = recv(sock, &magic, sizeof(magic), 0);
+ if(rc<8) {
+ ERROR("Short read in negotiation(2) (%d)\n",rc);
+
+ return -1;
+ }
+
+ if(ntohll(magic) != NBD_NEGOTIATION_MAGIC) {
+ ERROR("Not enough magic in negotiation(2) (%"PRIu64")\n",ntohll(magic));
+ close(sock);
+ return -1;
+ }
+
+ if(tdnbd_wait_read(sock) <= 0) {
+ ERROR("Timeout in nbd_negotiate");
+ close(sock);
+ return -1;
+ }
+
+ rc = recv(sock, &size, sizeof(size), 0);
+ if(rc<sizeof(size)) {
+ ERROR("Short read in negotiation(3) (%d)\n",rc);
+ close(sock);
+ return -1;
+ }
+
+ INFO("Got size: %"PRIu64"", ntohll(size));
+
+ driver->info.size = ntohll(size) >> SECTOR_SHIFT;
+ driver->info.sector_size = DEFAULT_SECTOR_SIZE;
+ driver->info.info = 0;
+
+ if(tdnbd_wait_read(sock) <= 0) {
+ ERROR("Timeout in nbd_negotiate");
+ close(sock);
+ return -1;
+ }
+
+ rc = recv(sock, &flags, sizeof(flags), 0);
+ if(rc<sizeof(flags)) {
+ ERROR("Short read in negotiation(4) (%d)\n",rc);
+ close(sock);
+ return -1;
+ }
+
+ INFO("Got flags: %"PRIu32"", ntohl(flags));
+
+ while(padbytes>0) {
+ if(tdnbd_wait_read(sock) <= 0) {
+ ERROR("Timeout in nbd_negotiate");
+ close(sock);
+ return -1;
+ }
+
+ rc = recv(sock, buffer, padbytes, 0);
+ if(rc<0) {
+ ERROR("Bad read in negotiation(5) (%d)\n",rc);
+ close(sock);
+ return -1;
+ }
+ padbytes -= rc;
+ }
+
+ INFO("Successfully connected to NBD server");
+
+ fcntl(sock, F_SETFL, O_NONBLOCK);
+
+ return 0;
+}
+
+
+int tdnbd_connect_import_session(struct tdnbd_data *prv, td_driver_t* driver)
+{
+ int sock;
+ int opt = 1;
+ int rc;
+
+ sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (sock < 0) {
+ ERROR("Could not create socket: %s\n", strerror(errno));
+ return -1;
+ }
+
+ rc = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt));
+ if (rc < 0) {
+ ERROR("Could not set TCP_NODELAY: %s\n", strerror(errno));
+ return -1;
+ }
+
+ prv->remote = (struct sockaddr_in *)malloc(sizeof(struct sockaddr_in *));
+ if (!prv->remote) {
+ ERROR("struct sockaddr_in malloc failure\n");
+ close(sock);
+ return -1;
+ }
+ prv->remote->sin_family = AF_INET;
+ rc = inet_pton(AF_INET, prv->peer_ip, &(prv->remote->sin_addr.s_addr));
+ if (rc < 0) {
+ ERROR("Could not create inaddr: %s\n", strerror(errno));
+ free(prv->remote);
+ prv->remote = NULL;
+ close(sock);
+ return -1;
+ }
+ else if (rc == 0) {
+ ERROR("inet_pton parse error\n");
+ free(prv->remote);
+ prv->remote = NULL;
+ close(sock);
+ return -1;
+ }
+ prv->remote->sin_port = htons(prv->port);
+
+ if (connect(sock, (struct sockaddr *)prv->remote, sizeof(struct sockaddr)) < 0) {
+ ERROR("Could not connect to peer: %s\n", strerror(errno));
+ close(sock);
+ return -1;
+ }
+
+ prv->socket = sock;
+
+ return tdnbd_nbd_negotiate(prv, driver);
+}
+
+/* -- interface -- */
+
+static int tdnbd_close(td_driver_t*);
+
+static int tdnbd_open(td_driver_t* driver, const char* name, td_flag_t flags)
+{
+ struct tdnbd_data *prv;
+ char peer_ip[256];
+ int port;
+ int rc;
+ int i;
+
+ driver->info.sector_size = 512;
+ driver->info.info = 0;
+
+ prv = (struct tdnbd_data *)driver->data;
+ memset(prv, 0, sizeof(struct tdnbd_data));
+
+ INFO("Opening nbd export to %s (flags=%x)\n", name, flags);
+
+ prv->writer_event_id=-1;
+ INIT_LIST_HEAD(&prv->sent_reqs);
+ INIT_LIST_HEAD(&prv->pending_reqs);
+ INIT_LIST_HEAD(&prv->free_reqs);
+ for(i=0; i<MAX_NBD_REQS; i++) {
+ INIT_LIST_HEAD(&prv->requests[i].queue);
+ prv->requests[i].timeout_event=-1;
+ list_add(&prv->requests[i].queue, &prv->free_reqs);
+ }
+ prv->nr_free_count = MAX_NBD_REQS;
+ prv->cur_reply_qio.buffer = (char *)&prv->current_reply;
+ prv->cur_reply_qio.len = sizeof(struct nbd_reply);
+ rc = sscanf(name, "%255[^:]:%d", peer_ip, &port);
+ if (rc == 2) {
+ prv->peer_ip = malloc(strlen(peer_ip) + 1);
+ if (!prv->peer_ip) {
+ ERROR("Failure to malloc for NBD destination");
+ return -1;
+ }
+ strcpy(prv->peer_ip, peer_ip);
+ prv->port=port;
+ prv->name=NULL;
+ INFO("Export peer=%s port=%d\n", prv->peer_ip, prv->port);
+ if (tdnbd_connect_import_session(prv, driver) < 0) {
+ return -1;
+ }
+
+ } else {
+ prv->socket = tdnbd_retreive_passed_fd(name);
+ if(prv->socket < 0) {
+ ERROR("Couldn't find fd named: %s",name);
+ return -1;
+ }
+ INFO("Found passed fd. Connecting...");
+ prv->remote = NULL;
+ prv->peer_ip = NULL;
+ prv->name = strdup(name);
+ prv->port = -1;
+ if(tdnbd_nbd_negotiate(prv, driver) < 0) {
+ ERROR("Failed to negotiate");
+ return -1;
+ }
+ }
+
+ prv->reader_event_id =
+ tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ prv->socket, 0,
+ tdnbd_reader_cb,
+ (void *)prv);
+
+ prv->flags = flags;
+ prv->closed = 0;
+
+ if(flags & TD_OPEN_SECONDARY) {
+ INFO("Opening in secondary mode: Read requests will be forwarded");
+ }
+
+ return 0;
+
+}
+
+static int tdnbd_close(td_driver_t* driver)
+{
+ struct tdnbd_data *prv = (struct tdnbd_data *)driver->data;
+ td_request_t treq;
+
+ bzero(&treq, sizeof(treq));
+
+
+ if(prv->closed == 3) {
+ INFO("NBD close: already decided that the connection is dead.");
+ if(prv->socket >= 0)
+ close(prv->socket);
+ prv->socket = -1;
+ return 0;
+ }
+
+ /* Send a close packet */
+
+ INFO("Sending disconnect request");
+ tdnbd_queue_request(prv, NBD_CMD_DISC, 0, 0, 0, treq, 0);
+
+ INFO("Switching socket to blocking IO mode");
+ fcntl(prv->socket, F_SETFL, fcntl(prv->socket, F_GETFL) & ~O_NONBLOCK);
+
+ INFO("Writing disconnection request");
+ tdnbd_writer_cb(0,0,prv);
+
+ INFO("Written");
+
+ if (prv->peer_ip) {
+ free(prv->peer_ip);
+ prv->peer_ip = NULL;
+ }
+
+ if (prv->name) {
+ tdnbd_stash_passed_fd(prv->socket, prv->name, 0);
+ free(prv->name);
+ } else {
+ if(prv->socket >= 0)
+ close(prv->socket);
+ prv->socket = -1;
+ }
+
+ return 0;
+}
+
+static void tdnbd_queue_read(td_driver_t* driver, td_request_t treq)
+{
+ struct tdnbd_data *prv = (struct tdnbd_data *)driver->data;
+ int size = treq.secs * driver->info.sector_size;
+ uint64_t offset = treq.sec * (uint64_t)driver->info.sector_size;
+
+ if(prv->flags & TD_OPEN_SECONDARY) {
+ td_forward_request(treq);
+ } else {
+ tdnbd_queue_request(prv, NBD_CMD_READ, offset, treq.buf, size, treq, 0);
+ }
+
+}
+
+static void tdnbd_queue_write(td_driver_t* driver, td_request_t treq)
+{
+ struct tdnbd_data *prv = (struct tdnbd_data *)driver->data;
+ int size = treq.secs * driver->info.sector_size;
+ uint64_t offset = treq.sec * (uint64_t)driver->info.sector_size;
+
+ //memcpy(img + offset, treq.buf, size);
+
+ tdnbd_queue_request(prv, NBD_CMD_WRITE, offset, treq.buf, size, treq, 0);
+}
+
+static int tdnbd_get_parent_id(td_driver_t* driver, td_disk_id_t* id)
+{
+ return TD_NO_PARENT;
+}
+
+static int tdnbd_validate_parent(td_driver_t *driver,
+ td_driver_t *parent, td_flag_t flags)
+{
+ return -EINVAL;
+}
+
+struct tap_disk tapdisk_nbd = {
+ .disk_type = "tapdisk_nbd",
+ .private_data_size = sizeof(struct tdnbd_data),
+ .flags = 0,
+ .td_open = tdnbd_open,
+ .td_close = tdnbd_close,
+ .td_queue_read = tdnbd_queue_read,
+ .td_queue_write = tdnbd_queue_write,
+ .td_get_parent_id = tdnbd_get_parent_id,
+ .td_validate_parent = tdnbd_validate_parent,
+};
34 drivers/tapdisk-control.c
View
@@ -54,6 +54,7 @@
#include "tapdisk-disktype.h"
#include "tapdisk-stats.h"
#include "tapdisk-control.h"
+#include "tapdisk-nbdserver.h"
#define TD_CTL_MAX_CONNECTIONS 10
#define TD_CTL_SOCK_BACKLOG 32
@@ -63,6 +64,7 @@
#define DBG(_f, _a...) tlog_syslog(TLOG_DBG, _f, ##_a)
#define ERR(err, _f, _a...) tlog_error(err, _f, ##_a)
+#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "control: " _f, ##_a)
#define ASSERT(_p) \
if (!(_p)) { \
@@ -757,6 +759,16 @@ tapdisk_control_open_image(struct tapdisk_ctl_conn *conn,
goto fail_close;
}
+ /* For now, lets do this automatically on all 'open' calls
+ In the future, we'll probably want a separate call to
+ start the NBD server */
+ err = tapdisk_vbd_start_nbdserver(vbd);
+
+ if (err) {
+ EPRINTF("failed to start nbdserver: %d\n",err);
+ goto fail_close;
+ }
+
err = 0;
out:
@@ -802,6 +814,10 @@ tapdisk_control_close_image(struct tapdisk_ctl_conn *conn,
goto out;
}
+ if(vbd->nbdserver) {
+ tapdisk_nbdserver_pause(vbd->nbdserver);
+ }
+
do {
err = tapdisk_blktap_remove_device(vbd->tap);
@@ -829,6 +845,11 @@ tapdisk_control_close_image(struct tapdisk_ctl_conn *conn,
if (err)
goto out;
+ if(vbd->nbdserver) {
+ tapdisk_nbdserver_free(vbd->nbdserver);
+ vbd->nbdserver = NULL;
+ }
+
tapdisk_vbd_close_vdi(vbd);
/* NB. vbd->name free should probably belong into close_vdi,
@@ -898,12 +919,25 @@ tapdisk_control_resume_vbd(struct tapdisk_ctl_conn *conn,
response.type = TAPDISK_MESSAGE_RESUME_RSP;
+ INFO("Resuming. flags=0x%08x secondary=%p\n", request->u.params.flags, request->u.params.secondary);
+
vbd = tapdisk_server_get_vbd(request->cookie);
if (!vbd) {
err = -EINVAL;
goto out;
}
+ if (request->u.params.flags & TAPDISK_MESSAGE_FLAG_SECONDARY) {
+ char *name = strdup(request->u.params.secondary);
+ if (!name) {
+ err = -errno;
+ goto out;
+ }
+ INFO("Resuming with secondary '%s'\n", name);
+ vbd->secondary_name = name;
+ vbd->flags |= TD_OPEN_SECONDARY;
+ }
+
if (request->u.params.path[0])
desc = request->u.params.path;
18 drivers/tapdisk-disktype.c
View
@@ -127,6 +127,18 @@ static const disk_info_t valve_disk = {
DISK_TYPE_FILTER,
};
+static const disk_info_t export_disk = {
+ "export",
+ "export to a remote VDI",
+ 0,
+};
+
+static const disk_info_t nbd_disk = {
+ "nbd",
+ "export to a NBD server",
+ 0,
+};
+
const disk_info_t *tapdisk_disk_types[] = {
[DISK_TYPE_AIO] = &aio_disk,
[DISK_TYPE_SYNC] = &sync_disk,
@@ -143,6 +155,8 @@ const disk_info_t *tapdisk_disk_types[] = {
[DISK_TYPE_VALVE] = &valve_disk,
[DISK_TYPE_LLPCACHE] = &llpcache_disk,
[DISK_TYPE_LLECACHE] = &llecache_disk,
+ [DISK_TYPE_EXPORT] = &export_disk,
+ [DISK_TYPE_NBD] = &nbd_disk,
0,
};
@@ -166,6 +180,8 @@ extern struct tap_disk tapdisk_lcache;
extern struct tap_disk tapdisk_llpcache;
extern struct tap_disk tapdisk_llecache;
extern struct tap_disk tapdisk_valve;
+extern struct tap_disk tapdisk_export;
+extern struct tap_disk tapdisk_nbd;
const struct tap_disk *tapdisk_disk_drivers[] = {
[DISK_TYPE_AIO] = &tapdisk_aio,
@@ -188,6 +204,8 @@ const struct tap_disk *tapdisk_disk_drivers[] = {
[DISK_TYPE_LLPCACHE] = &tapdisk_llpcache,
[DISK_TYPE_LLECACHE] = &tapdisk_llecache,
[DISK_TYPE_VALVE] = &tapdisk_valve,
+ [DISK_TYPE_EXPORT] = &tapdisk_export,
+ [DISK_TYPE_NBD] = &tapdisk_nbd,
0,
};
2  drivers/tapdisk-disktype.h
View
@@ -44,6 +44,8 @@
#define DISK_TYPE_LLECACHE 12
#define DISK_TYPE_LLPCACHE 13
#define DISK_TYPE_VALVE 14
+#define DISK_TYPE_EXPORT 15
+#define DISK_TYPE_NBD 16
#define DISK_TYPE_NAME_MAX 32
228 drivers/tapdisk-fdreceiver.c
View
@@ -0,0 +1,228 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <sys/wait.h>
+#include <sys/un.h>
+
+#include "tapdisk.h"
+#include "tapdisk-fdreceiver.h"
+#include "tapdisk-server.h"
+#include "scheduler.h"
+
+#define UNIX_BUFFER_SIZE 16384
+
+#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "nbd: " _f, ##_a)
+#define ERROR(_f, _a...) tlog_syslog(TLOG_WARN, "nbd: " _f, ##_a)
+
+
+void
+td_fdreceiver_recv_fd(event_id_t id, char mode, void *data)
+{
+ struct td_fdreceiver *fdreceiver=data;
+ int ret, cv_flags=0, *fdp, fd=-1;
+ long numbytes;
+ char iobuf[UNIX_BUFFER_SIZE];
+ char buf[CMSG_SPACE(sizeof(fd))];
+ struct sockaddr_un unix_socket_name;
+
+ struct msghdr msg;
+ struct iovec vec;
+ struct cmsghdr *cmsg;
+
+ numbytes = UNIX_BUFFER_SIZE;
+
+ bzero(iobuf,numbytes);
+
+ msg.msg_name=&unix_socket_name;
+ msg.msg_namelen=sizeof(unix_socket_name);
+ vec.iov_base=iobuf;
+ vec.iov_len=numbytes;
+ msg.msg_iov=&vec;
+
+ msg.msg_iovlen=1;
+
+ msg.msg_control = buf;
+ msg.msg_controllen = sizeof(buf);
+
+ ret=recvmsg(fdreceiver->client_fd, &msg, cv_flags);
+
+ if(ret == -1) {
+ ERROR("Failed to receive the message: %d",ret);
+ return;
+ }
+
+ if(ret>0 && msg.msg_controllen>0) {
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if(cmsg->cmsg_level == SOL_SOCKET && (cmsg->cmsg_type == SCM_RIGHTS)) {
+ fdp = (int*)CMSG_DATA(cmsg);
+ fd = *fdp;
+ } else {
+ ERROR("Failed to recieve a file descriptor");
+ }
+ } else {
+ fd=-1;
+ }
+
+ if(ret<numbytes)
+ numbytes = ret;
+
+ INFO("Recieved fd with message: %s",iobuf);
+
+ /* We're done with this connection, it was only transiently used to
+ connect the client */
+ close(fdreceiver->client_fd);
+ fdreceiver->client_fd = -1;
+
+ tapdisk_server_unregister_event(fdreceiver->client_event_id);
+ fdreceiver->client_event_id = -1;
+
+ /* It is the responsibility of this callback function to arrange
+ that the fd is eventually closed */
+ fdreceiver->callback(fd, iobuf, fdreceiver->callback_data);
+}
+
+void
+td_fdreceiver_accept_fd(event_id_t id, char mode, void *data)
+{
+ struct sockaddr_storage their_addr;
+ socklen_t sin_size = sizeof(their_addr);
+ struct td_fdreceiver *fdreceiver=data;
+ int new_fd;
+
+ INFO("Unix domain socket is ready to accept");
+
+ new_fd = accept(fdreceiver->fd, (struct sockaddr *)&their_addr, &sin_size);
+
+ if(fdreceiver->client_fd != -1) {
+ ERROR("td_fdreceiver_accept_fd: an only cope with one connection at once to the unix domain socket!");
+ close(new_fd);
+ return;
+ }
+
+ fdreceiver->client_fd = new_fd;
+
+ fdreceiver->client_event_id =
+ tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ fdreceiver->client_fd, 0,
+ td_fdreceiver_recv_fd,
+ fdreceiver);
+
+ if(fdreceiver->client_event_id < 0) {
+ ERROR("td_fdreceiver_accept_fd: failed to register event (errno=%d)",errno);
+ close(new_fd);
+ fdreceiver->client_fd = -1;
+ }
+}
+
+void td_fdreceiver_stop(struct td_fdreceiver *fdreceiver)
+{
+ if(fdreceiver->client_fd >= 0) {
+ close(fdreceiver->client_fd);
+ }
+
+ if(fdreceiver->client_event_id >= 0) {
+ tapdisk_server_unregister_event(fdreceiver->client_event_id);
+ }
+
+ if(fdreceiver->fd >= 0) {
+ close(fdreceiver->fd);
+ }
+
+ if(fdreceiver->fd_event_id >= 0) {
+ tapdisk_server_unregister_event(fdreceiver->fd_event_id);
+ }
+
+ if(fdreceiver->path != NULL) {
+ unlink(fdreceiver->path);
+ free(fdreceiver->path);
+ }
+
+}
+
+
+struct td_fdreceiver *
+td_fdreceiver_start(char *path, fd_cb_t callback, void *data)
+{
+ unsigned int s = -1;
+ struct sockaddr_un local;
+ int len;
+ int err;
+ struct td_fdreceiver *fdreceiver;
+
+ fdreceiver=malloc(sizeof(struct td_fdreceiver));
+ if(!fdreceiver) {
+ ERROR("td_fdreceiver_start: error allocating memory for fdreceiver (path=%s)",path);
+ goto error;
+ }
+
+ fdreceiver->path=strdup(path);
+ fdreceiver->fd=-1;
+ fdreceiver->fd_event_id=-1;
+ fdreceiver->client_fd=-1;
+ fdreceiver->client_event_id=-1;
+ fdreceiver->callback=callback;
+ fdreceiver->callback_data=data;
+
+ snprintf(local.sun_path, sizeof(local.sun_path), "%s", path);
+ local.sun_family = AF_UNIX;
+
+ /* N.b. here we unlink anything that was there before - be very
+ careful with the paths you pass to this function! */
+ unlink(local.sun_path);
+ len = strlen(local.sun_path) + sizeof(local.sun_family);
+
+ s = socket(AF_UNIX, SOCK_STREAM, 0);
+
+ if(s<0) {
+ ERROR("td_fdreceiver_start: error creating socket (path=%s)",path);
+ goto error;
+ }
+
+ err = bind(s, (struct sockaddr *)&local, len);
+
+ if(err<0) {
+ ERROR("td_fdreceiver_start: error binding (path=%s)",path);
+ goto error;
+ }
+
+ err = listen(s, 5);
+ if(err < 0) {
+ ERROR("td_fdreceiver_start: error listening (path=%s)",path);
+ goto error;
+ }
+
+ fdreceiver->fd = s;
+
+ fdreceiver->fd_event_id =
+ tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ fdreceiver->fd, 0,
+ td_fdreceiver_accept_fd,
+ fdreceiver);
+
+ if(fdreceiver->fd_event_id < 0) {
+ ERROR("td_fdreceiver_start: error registering event (path=%s)",path);
+ goto error;
+ }
+
+ INFO("Set up local unix domain socket on path '%s'",path);
+
+ return fdreceiver;
+
+error:
+ if(fdreceiver) {
+ free(fdreceiver);
+ }
+
+ if(s>=0)
+ close(s);
+
+ return NULL;
+}
+
19 drivers/tapdisk-fdreceiver.h
View
@@ -0,0 +1,19 @@
+/* Unix domain socket fd receiver */
+
+typedef void (*fd_cb_t) (int fd, char *msg, void *data);
+
+struct td_fdreceiver *td_fdreceiver_start(char *path, fd_cb_t, void *data);
+void td_fdreceiver_stop(struct td_fdreceiver *);
+
+struct td_fdreceiver {
+ char *path;
+
+ int fd;
+ int fd_event_id;
+
+ int client_fd;
+ int client_event_id;
+
+ fd_cb_t callback;
+ void *callback_data;
+};
88 drivers/tapdisk-nbd.h
View
@@ -0,0 +1,88 @@
+/*
+ * 1999 Copyright (C) Pavel Machek, pavel@ucw.cz. This code is GPL.
+ * 1999/11/04 Copyright (C) 1999 VMware, Inc. (Regis "HPReg" Duchesne)
+ * Made nbd_end_request() use the io_request_lock
+ * 2001 Copyright (C) Steven Whitehouse
+ * New nbd_end_request() for compatibility with new linux block
+ * layer code.
+ * 2003/06/24 Louis D. Langholtz <ldl@aros.net>
+ * Removed unneeded blksize_bits field from nbd_device struct.
+ * Cleanup PARANOIA usage & code.
+ * 2004/02/19 Paul Clements
+ * Removed PARANOIA, plus various cleanup and comments
+ */
+
+#ifndef LINUX_NBD_H
+#define LINUX_NBD_H
+
+//#include <linux/types.h>
+
+#define NBD_NEGOTIATION_MAGIC 0x00420281861253LL
+
+#define NBD_SET_SOCK _IO( 0xab, 0 )
+#define NBD_SET_BLKSIZE _IO( 0xab, 1 )
+#define NBD_SET_SIZE _IO( 0xab, 2 )
+#define NBD_DO_IT _IO( 0xab, 3 )
+#define NBD_CLEAR_SOCK _IO( 0xab, 4 )
+#define NBD_CLEAR_QUE _IO( 0xab, 5 )
+#define NBD_PRINT_DEBUG _IO( 0xab, 6 )
+#define NBD_SET_SIZE_BLOCKS _IO( 0xab, 7 )
+#define NBD_DISCONNECT _IO( 0xab, 8 )
+#define NBD_SET_TIMEOUT _IO( 0xab, 9 )
+#define NBD_SET_FLAGS _IO( 0xab, 10 )
+
+enum {
+ NBD_CMD_READ = 0,
+ NBD_CMD_WRITE = 1,
+ NBD_CMD_DISC = 2,
+ NBD_CMD_FLUSH = 3,
+ NBD_CMD_TRIM = 4
+};
+
+#define NBD_CMD_MASK_COMMAND 0x0000ffff
+#define NBD_CMD_FLAG_FUA (1<<16)
+
+/* values for flags field */
+#define NBD_FLAG_HAS_FLAGS (1 << 0) /* Flags are there */
+#define NBD_FLAG_READ_ONLY (1 << 1) /* Device is read-only */
+#define NBD_FLAG_SEND_FLUSH (1 << 2) /* Send FLUSH */
+#define NBD_FLAG_SEND_FUA (1 << 3) /* Send FUA (Force Unit Access) */
+#define NBD_FLAG_ROTATIONAL (1 << 4) /* Use elevator algorithm - rotational media */
+#define NBD_FLAG_SEND_TRIM (1 << 5) /* Send TRIM (discard) */
+
+#define nbd_cmd(req) ((req)->cmd[0])
+
+/* userspace doesn't need the nbd_device structure */
+
+/* These are sent over the network in the request/reply magic fields */
+
+#define NBD_REQUEST_MAGIC 0x25609513
+#define NBD_REPLY_MAGIC 0x67446698
+/* Do *not* use magics: 0x12560953 0x96744668. */
+
+#define __be32 uint32_t
+#define __be64 uint64_t
+
+
+/*
+ * This is the packet used for communication between client and
+ * server. All data are in network byte order.
+ */
+struct nbd_request {
+ __be32 magic;
+ __be32 type; /* == READ || == WRITE */
+ char handle[8];
+ __be64 from;
+ __be32 len;
+} __attribute__ ((packed));
+
+/*
+ * This is the reply packet that nbd-server sends back to the client after
+ * it has completed an I/O request (or an error occurs).
+ */
+struct nbd_reply {
+ __be32 magic;
+ __be32 error; /* 0 = ok, else error */
+ char handle[8]; /* handle you got from request */
+};
+#endif
669 drivers/tapdisk-nbdserver.c
View
@@ -0,0 +1,669 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <sys/wait.h>
+#include <sys/un.h>
+
+#include "tapdisk.h"
+#include "tapdisk-server.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-interface.h"
+#include "tapdisk-utils.h"
+#include "tapdisk-nbdserver.h"
+#include "tapdisk-fdreceiver.h"
+
+#include "tapdisk-nbd.h"
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#define NBD_SERVER_NUM_REQS TAPDISK_DATA_REQUESTS
+
+#define TAPDISK_NBDSERVER_LISTENING_SOCK_PATH "/var/run/blktap-control/nbdserver"
+#define TAPDISK_NBDSERVER_MAX_PATH_LEN 256
+
+/*
+ * Server
+ */
+
+#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "nbd: " _f, ##_a)
+#define ERROR(_f, _a...) tlog_syslog(TLOG_WARN, "nbd: " _f, ##_a)
+
+struct td_nbdserver_req {
+ td_vbd_request_t vreq;
+ char id[16];
+ struct td_iovec iov;
+};
+
+void
+tapdisk_nbdserver_disable_client(td_nbdserver_client_t *client);
+void
+tapdisk_nbdserver_clientcb(event_id_t id, char mode, void *data);
+int
+tapdisk_nbdserver_setup_listening_socket(td_nbdserver_t *server);
+int
+tapdisk_nbdserver_unpause(td_nbdserver_t *server);
+
+td_nbdserver_req_t *
+tapdisk_nbdserver_alloc_request(td_nbdserver_client_t *client)
+{
+ td_nbdserver_req_t *req = NULL;
+
+ if (likely(client->n_reqs_free))
+ req = client->reqs_free[--client->n_reqs_free];
+
+ return req;
+}
+
+void
+tapdisk_nbdserver_free_request(td_nbdserver_client_t *client, td_nbdserver_req_t *req)
+{
+ if(client->n_reqs_free >= client->n_reqs) {
+ ERROR("Error, trying to free a client, but the free list is full! leaking!");
+ return;
+ }
+ client->reqs_free[client->n_reqs_free++] = req;
+}
+
+void
+tapdisk_nbdserver_reqs_free(td_nbdserver_client_t *client)
+{
+ if (client->reqs) {
+ free(client->reqs);
+ client->reqs = NULL;
+ }
+
+ if (client->iovecs) {
+ free(client->iovecs);
+ client->iovecs = NULL;
+ }
+
+ if (client->reqs_free) {
+ free(client->reqs_free);
+ client->reqs_free = NULL;
+ }
+}
+
+int
+tapdisk_nbdserver_reqs_init(td_nbdserver_client_t *client, int n_reqs)
+{
+ int i, err;
+
+ INFO("Reqs init");
+
+ client->reqs = malloc(n_reqs * sizeof(td_nbdserver_req_t));
+ if (!client->reqs) {
+ err = -errno;
+ goto fail;
+ }
+ client->iovecs = malloc(n_reqs * sizeof(struct td_iovec));
+ if (!client->iovecs) {
+ err = - errno;
+ goto fail;
+ }
+
+ client->reqs_free = malloc(n_reqs * sizeof(td_nbdserver_req_t*));
+ if (!client->reqs_free) {
+ err = -errno;
+ goto fail;
+ }
+
+ client->n_reqs = n_reqs;
+ client->n_reqs_free = 0;
+
+ for (i = 0; i < n_reqs; i++) {
+ client->reqs[i].vreq.iov=&client->iovecs[i];
+ tapdisk_nbdserver_free_request(client, &client->reqs[i]);
+ }
+
+ return 0;
+
+fail:
+ tapdisk_nbdserver_reqs_free(client);
+ return err;
+}
+
+td_nbdserver_client_t *
+tapdisk_nbdserver_alloc_client(td_nbdserver_t *server)
+{
+ td_nbdserver_client_t *client=NULL;
+ int err;
+
+ INFO("Alloc client");
+
+ client=malloc(sizeof(td_nbdserver_client_t));
+ if(!client) {
+ ERROR("Couldn't allocate client structure: %s",strerror(errno));
+ goto fail;
+ }
+
+ bzero(client, sizeof(td_nbdserver_client_t));
+
+ err = tapdisk_nbdserver_reqs_init(client, NBD_SERVER_NUM_REQS);
+ if(err<0) {
+ ERROR("Couldn't allocate client reqs: %d",err);
+ goto fail;
+ }
+
+ client->client_fd=-1;
+ client->client_event_id=-1;
+ client->server=server;
+ INIT_LIST_HEAD(&client->clientlist);
+ list_add(&client->clientlist, &server->clients);
+
+ client->paused=0;
+
+ return client;
+
+fail:
+ if(client) {
+ free(client);
+ client=NULL;
+ }
+
+ return client;
+}
+
+void
+tapdisk_nbdserver_free_client(td_nbdserver_client_t *client)
+{
+ INFO("Free client");
+
+ if(!client) {
+ ERROR("Attempt to free NULL pointer!");
+ return;
+ }
+
+ if(client->client_event_id >= 0) {
+ tapdisk_nbdserver_disable_client(client);
+ }
+
+ list_del(&client->clientlist);
+ tapdisk_nbdserver_reqs_free(client);
+ free(client);
+}
+
+int
+tapdisk_nbdserver_enable_client(td_nbdserver_client_t *client)
+{
+ INFO("Enable client");
+
+ if(client->client_event_id >= 0) {
+ ERROR("Attempting to enable an already-enabled client");
+ return -1;
+ }
+
+ if(client->client_fd < 0) {
+ ERROR("Attempting to register events on a closed client");
+ return -1;
+ }
+
+ client->client_event_id =
+ tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ client->client_fd, 0,
+ tapdisk_nbdserver_clientcb,
+ client);
+
+ if(client->client_event_id < 0) {
+ ERROR("Error registering events on client: %d",client->client_event_id);
+ return client->client_event_id;
+ }
+
+ return client->client_event_id;
+}
+
+void
+tapdisk_nbdserver_disable_client(td_nbdserver_client_t *client)
+{
+ INFO("Disable client");
+
+ if(client->client_event_id < 0) {
+ ERROR("Attempting to disable an already-disabled client");
+ return;
+ }
+
+ tapdisk_server_unregister_event(client->client_event_id);
+ client->client_event_id=-1;
+}
+
+void *get_in_addr(struct sockaddr *sa)
+{
+ if (sa->sa_family == AF_INET) {
+ return &(((struct sockaddr_in*)sa)->sin_addr);
+ }
+
+ return &(((struct sockaddr_in6*)sa)->sin6_addr);
+}
+
+
+void
+__tapdisk_nbdserver_request_cb(td_vbd_request_t *vreq, int error,
+ void *token, int final)
+{
+ td_nbdserver_client_t *client = token;
+ td_nbdserver_req_t *req = containerof(vreq, td_nbdserver_req_t, vreq);
+ struct nbd_reply reply;
+ int tosend=0;
+ int sent=0;
+ int len=0;
+
+ reply.magic = htonl(NBD_REPLY_MAGIC);
+ reply.error = htonl(error);
+ memcpy(reply.handle, req->id, sizeof(reply.handle));
+
+ if(client->client_fd < 0) {
+ ERROR("Finishing request for client that has disappeared");
+ goto finish;
+ }
+
+ send(client->client_fd, &reply, sizeof(reply), 0);
+
+ switch(vreq->op) {
+ case TD_OP_READ:
+ tosend=len=vreq->iov->secs << SECTOR_SHIFT;
+ while(tosend>0) {
+ sent = send(client->client_fd, vreq->iov->base+(len-tosend), tosend, 0);
+ if(sent <= 0) {
+ ERROR("Short send or error in callback: %d",sent);
+ goto finish;
+ }
+
+ tosend -= sent;
+ }
+ break;
+ default:
+ break;
+ }
+
+finish:
+ free(vreq->iov->base);
+ tapdisk_nbdserver_free_request(client,req);
+}
+
+void
+tapdisk_nbdserver_newclient_fd(td_nbdserver_t *server, int new_fd);
+void
+tapdisk_nbdserver_clientcb(event_id_t id, char mode, void *data)
+{
+ td_nbdserver_client_t *client=data;
+ td_nbdserver_t *server=client->server;
+ int rc;
+ int len;
+ int hdrlen;
+ int n;
+ int fd = client->client_fd;
+ char *ptr;
+ td_vbd_request_t *vreq;
+ struct nbd_request request;
+
+ td_nbdserver_req_t *req=tapdisk_nbdserver_alloc_request(client);
+
+ if(req==NULL) {
+ ERROR("Couldn't allocate request in clientcb - killing client");
+ tapdisk_nbdserver_free_client(client);
+ return;
+ }
+
+ vreq=&req->vreq;
+
+ memset(req, 0, sizeof(td_nbdserver_req_t));
+ /* Read the request the client has sent */
+
+ hdrlen = sizeof(struct nbd_request);
+
+ n = 0;
+ ptr = (char *) &request;
+ while(n<hdrlen) {
+ rc=recv(fd, ptr+n, hdrlen - n, 0);
+ if(rc==0) {
+ INFO("Client closed connection");
+ goto fail;
+ }
+ if(rc < 0) {
+ ERROR("Bad return in nbdserver_clientcb. Closing connection");
+ goto fail;
+ }
+ n += rc;
+ }
+
+ if(request.magic != htonl(NBD_REQUEST_MAGIC)) {
+ ERROR("Not enough magic");
+ goto fail;
+ }
+
+ request.from = ntohll(request.from);
+ request.type = ntohl(request.type);
+ len = ntohl(request.len);
+ if(((len & 0x1ff) != 0) || ((request.from & 0x1ff) != 0)) {
+ ERROR("Non sector-aligned request (%"PRIu64",%d)",request.from,len);
+
+ }
+
+ bzero(req->id, sizeof(req->id));
+ memcpy(req->id, request.handle, sizeof(request.handle));
+
+ rc=posix_memalign(&req->iov.base, 512, len);
+ if(rc<0) {
+ ERROR("posix_memalign failed (%d)",rc);
+ goto fail;
+ }
+ vreq->sec=request.from >> SECTOR_SHIFT;
+ vreq->iovcnt=1;
+ vreq->iov=&req->iov;
+ vreq->iov->secs = len >> SECTOR_SHIFT;
+ vreq->token = client;
+ vreq->cb = __tapdisk_nbdserver_request_cb;
+ vreq->name = req->id;
+ vreq->vbd = server->vbd;
+
+ switch(request.type) {
+ case NBD_CMD_READ:
+ vreq->op=TD_OP_READ;
+ break;
+ case NBD_CMD_WRITE:
+ vreq->op=TD_OP_WRITE;
+
+ n=0;
+ while(n<len) {
+ rc = recv(fd, vreq->iov->base+n, (len-n), 0);
+ if(rc <= 0) {
+ ERROR("Short send or error in callback: %d",rc);
+ goto fail;
+ }
+
+ n += rc;
+ };
+
+ break;
+ case NBD_CMD_DISC:
+ INFO("Received close message. Sending reconnect header");
+ tapdisk_nbdserver_free_client(client);
+ INFO("About to send initial connection message");
+ tapdisk_nbdserver_newclient_fd(server, fd);
+ INFO("Sent");
+ return;
+
+ default:
+ ERROR("Unsupported operation: 0x%x",request.type);
+ goto fail;
+ }
+
+ rc = tapdisk_vbd_queue_request(server->vbd, vreq);
+ if (rc) {
+ ERROR("tapdisk_vbd_queue_request failed: %d",rc);
+ goto fail;
+ }
+
+ return;
+
+fail:
+
+ tapdisk_nbdserver_free_client(client);
+ return;
+}
+
+void
+tapdisk_nbdserver_newclient_fd(td_nbdserver_t *server, int new_fd)
+{
+ char buffer[256];
+ int rc;
+ uint64_t tmp64;
+ uint32_t tmp32;
+
+ INFO("Got a new client!");
+
+ /* Spit out the NBD connection stuff */
+
+ memcpy(buffer, "NBDMAGIC", 8);
+ tmp64 = htonll(NBD_NEGOTIATION_MAGIC);
+ memcpy(buffer+8, &tmp64, sizeof(tmp64));
+ tmp64 = htonll(server->info.size * server->info.sector_size);
+ memcpy(buffer+16, &tmp64, sizeof(tmp64));
+ tmp32 = htonl(0);
+ memcpy(buffer+24, &tmp32, sizeof(tmp32));
+ bzero(buffer+28, 124);
+
+ rc=send(new_fd, buffer, 152, 0);
+
+ if(rc<152) {
+ close(new_fd);
+ INFO("Short write in negotiation!");
+ }
+
+ INFO("About to alloc client");
+ td_nbdserver_client_t *client=tapdisk_nbdserver_alloc_client(server);
+ INFO("Got an allocated client at %p",client);
+ client->client_fd = new_fd;
+ INFO("About to enable client");
+
+
+ if(tapdisk_nbdserver_enable_client(client) < 0) {
+ ERROR("Error enabling client");
+ tapdisk_nbdserver_free_client(client);
+ close(new_fd);
+ return;
+ }
+}
+
+void
+tapdisk_nbdserver_fdreceiver_cb(int fd, char *msg, void *data)
+{
+ td_nbdserver_t *server=data;
+
+ INFO("Received fd with msg: %s",msg);
+
+ tapdisk_nbdserver_newclient_fd(server, fd);
+}
+
+void
+tapdisk_nbdserver_newclient(event_id_t id, char mode, void *data)
+{
+ struct sockaddr_storage their_addr;
+ socklen_t sin_size = sizeof(their_addr);
+ char s[INET6_ADDRSTRLEN];
+ int new_fd;
+ td_nbdserver_t *server=data;
+
+ INFO("About to accept (server->listening_fd=%d)",server->listening_fd);
+ new_fd = accept(server->listening_fd, (struct sockaddr *)&their_addr, &sin_size);
+ if (new_fd == -1) {
+
+ ERROR("accept (%s)", strerror(errno));
+ return;
+ }
+
+ inet_ntop(their_addr.ss_family,
+ get_in_addr((struct sockaddr *)&their_addr),
+ s, sizeof s);
+
+ INFO("server: got connection from %s\n", s);
+
+ tapdisk_nbdserver_newclient_fd(server, new_fd);
+}
+
+td_nbdserver_t *
+tapdisk_nbdserver_alloc(td_vbd_t *vbd, td_disk_info_t info)
+{
+ td_nbdserver_t *server;
+ char fdreceiver_path[TAPDISK_NBDSERVER_MAX_PATH_LEN];
+
+ server = malloc(sizeof(*server));
+ if (!server) {
+ ERROR("Failed to allocate memory for nbdserver, errno=%d",errno);
+ return NULL;
+ }
+
+ memset(server, 0, sizeof(*server));
+ server->listening_fd = -1;
+ server->listening_event_id = -1;
+ INIT_LIST_HEAD(&server->clients);
+
+ server->vbd = vbd;
+ server->info = info;
+
+ snprintf(fdreceiver_path, TAPDISK_NBDSERVER_MAX_PATH_LEN, "%s%d.%d",
+ TAPDISK_NBDSERVER_LISTENING_SOCK_PATH, getpid(),
+ vbd->uuid);
+
+ server->fdreceiver=td_fdreceiver_start(fdreceiver_path,
+ tapdisk_nbdserver_fdreceiver_cb, server);
+
+ if(!server->fdreceiver) {
+ ERROR("Error setting up fd receiver");
+ tapdisk_server_unregister_event(server->listening_event_id);
+ close(server->listening_fd);
+ return NULL;
+ }
+
+ return server;
+}
+
+int
+tapdisk_nbdserver_listen(td_nbdserver_t *server, int port)
+{
+ struct addrinfo hints, *servinfo, *p;
+ char portstr[10];
+ int err;
+ int yes=1;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE;
+
+
+ snprintf(portstr,10,"%d",port);
+
+ if ((err=getaddrinfo(NULL, portstr, &hints, &servinfo)) != 0) {
+ ERROR("Failed to getaddrinfo");
+ return -1;
+ }
+
+ for(p=servinfo; p != NULL; p = p->ai_next) {
+ if((server->listening_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
+ ERROR("Failed to create socket");
+ continue;
+ }
+
+ if(setsockopt(server->listening_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
+ ERROR("Failed to setsockopt");
+ close(server->listening_fd);
+ return -1;
+ }
+
+ if(bind(server->listening_fd, p->ai_addr, p->ai_addrlen) == -1) {
+ ERROR("Failed to bind");
+ close(server->listening_fd);
+ continue;
+ }
+
+ break;
+ }
+
+ if(p==NULL) {
+ ERROR("Failed to bind");
+ close(server->listening_fd);
+ return -1;
+ }
+
+ freeaddrinfo(servinfo);
+
+ if(listen(server->listening_fd, 10) == -1) {
+ ERROR("listen");
+ return -1;
+ }
+
+ tapdisk_nbdserver_unpause(server);
+
+ if (server->listening_event_id < 0) {
+ err = server->listening_event_id;
+ close(server->listening_fd);
+ return -1;
+ }
+
+ INFO("Successfully started NBD server");
+
+ return 0;
+}
+
+
+void
+tapdisk_nbdserver_pause(td_nbdserver_t *server)
+{
+ struct td_nbdserver_client *pos, *q;
+
+ INFO("NBD server pause(%p)",server);
+
+ list_for_each_entry_safe(pos, q, &server->clients, clientlist){
+ if(pos->paused != 1 && pos->client_event_id >= 0) {
+ tapdisk_nbdserver_disable_client(pos);
+ pos->paused = 1;
+ }
+ }
+
+ if(server->listening_event_id >= 0) {
+ tapdisk_server_unregister_event(server->listening_event_id);
+ }
+}
+
+int
+tapdisk_nbdserver_unpause(td_nbdserver_t *server)
+{
+ struct td_nbdserver_client *pos, *q;
+
+ INFO("NBD server unpause(%p) - listening_fd=%d",server,server->listening_fd);
+
+ list_for_each_entry_safe(pos, q, &server->clients, clientlist){
+ if(pos->paused == 1) {
+ tapdisk_nbdserver_enable_client(pos);
+ pos->paused = 0;
+ }
+ }
+
+ if(server->listening_event_id < 0 && server->listening_fd >= 0) {
+ server->listening_event_id =
+ tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ server->listening_fd, 0,
+ tapdisk_nbdserver_newclient,
+ server);
+ INFO("registering for listening_fd");
+ }
+
+ return server->listening_event_id;
+}
+
+void
+tapdisk_nbdserver_free(td_nbdserver_t *server)
+{
+ struct td_nbdserver_client *pos, *q;
+
+ INFO("NBD server free(%p)",server);
+
+ list_for_each_entry_safe(pos, q, &server->clients, clientlist){
+ tapdisk_nbdserver_free_client(pos);
+ }
+
+ if(server->listening_event_id >= 0) {
+ tapdisk_server_unregister_event(server->listening_event_id);
+ server->listening_event_id = -1;
+ }
+
+ if(server->listening_fd >= 0) {
+ close(server->listening_fd);
+ server->listening_fd = -1;
+ }
+
+ if(server->fdreceiver) {
+ td_fdreceiver_stop(server->fdreceiver);
+ }
+
+ free(server);
+}
75 drivers/tapdisk-nbdserver.h
View
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2010, Citrix Systems, Inc.
+ *
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of XenSource Inc. nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _TAPDISK_NBDSERVER_H_
+#define _TAPDISK_NBDSERVER_H_
+
+typedef struct td_nbdserver td_nbdserver_t;
+typedef struct td_nbdserver_req td_nbdserver_req_t;
+typedef struct td_nbdserver_client td_nbdserver_client_t;
+
+#include "blktap.h"
+#include "tapdisk-vbd.h"
+#include "list.h"
+
+struct td_nbdserver {
+ td_vbd_t *vbd;
+ td_disk_info_t info;
+
+ int listening_fd;
+ int listening_event_id;
+
+ struct td_fdreceiver *fdreceiver;
+ struct list_head clients;
+};
+
+struct td_nbdserver_client {
+ int n_reqs;
+ td_nbdserver_req_t *reqs;
+ struct td_iovec *iovecs;
+ int n_reqs_free;
+ td_nbdserver_req_t **reqs_free;
+
+ int client_fd;
+ int client_event_id;
+
+ td_nbdserver_t *server;
+ struct list_head clientlist;
+
+ int paused;
+};
+
+td_nbdserver_t *tapdisk_nbdserver_alloc(td_vbd_t *, td_disk_info_t);
+int tapdisk_nbdserver_listen(td_nbdserver_t *, int);
+void tapdisk_nbdserver_free(td_nbdserver_t *);
+void tapdisk_nbdserver_pause(td_nbdserver_t *);
+int tapdisk_nbdserver_unpause(td_nbdserver_t *);
+
+
+#endif /* _TAPDISK_NBDSERVER_H_ */
18 drivers/tapdisk-utils.c
View
@@ -41,6 +41,8 @@
#include <sys/ioctl.h>
#include <sys/resource.h>
#include <sys/utsname.h>
+#include <arpa/inet.h>
+
#ifdef __linux__
#include <linux/version.h>
#endif
@@ -268,3 +270,19 @@ int tapdisk_linux_version(void)
}
#endif
+
+#ifdef WORDS_BIGENDIAN
+uint64_t ntohll(uint64_t a) {
+ return a;
+}
+#else
+uint64_t ntohll(uint64_t a) {
+ uint32_t lo = a & 0xffffffff;
+ uint32_t hi = a >> 32U;
+ lo = ntohl(lo);
+ hi = ntohl(hi);
+ return ((uint64_t) lo) << 32U | hi;
+}
+#endif
+#define htonll ntohll
+
3  drivers/tapdisk-utils.h
View
@@ -44,5 +44,8 @@ int tapdisk_namedup(char **, const char *);
int tapdisk_parse_disk_type(const char *, char **, int *);
int tapdisk_get_image_size(int, uint64_t *, uint32_t *);
int tapdisk_linux_version(void);
+uint64_t ntohll(uint64_t);
+#define htonll ntohll
+
#endif
94 drivers/tapdisk-vbd.c
View
@@ -47,15 +47,20 @@
#include "tapdisk-image.h"
#include "tapdisk-driver.h"
#include "tapdisk-server.h"
+
#include "tapdisk-vbd.h"
#include "tapdisk-disktype.h"
#include "tapdisk-interface.h"
#include "tapdisk-stats.h"
#include "tapdisk-storage.h"
+#include "tapdisk-nbdserver.h"
#define DBG(_level, _f, _a...) tlog_write(_level, _f, ##_a)
#define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)
+#define INFO(_f, _a...) tlog_syslog(TLOG_INFO, "vbd: " _f, ##_a)
+#define ERROR(_f, _a...) tlog_syslog(TLOG_WARN, "vbd: " _f, ##_a)
+
#if 1
#define ASSERT(p) \
do { \
@@ -273,6 +278,13 @@ tapdisk_vbd_add_secondary(td_vbd_t *vbd)
const char *path;
int type, err;
+ if(strcmp(vbd->secondary_name, "null")==0) {
+ DPRINTF("Removing secondary image\n");
+ vbd->secondary_mode=TD_VBD_SECONDARY_DISABLED;
+ vbd->secondary=NULL;
+ return 0;
+ }
+
DPRINTF("Adding secondary image: %s\n", vbd->secondary_name);
type = tapdisk_disktype_parse_params(vbd->secondary_name, &path);
@@ -286,9 +298,18 @@ tapdisk_vbd_add_secondary(td_vbd_t *vbd)
}
err = tapdisk_image_open(type, path, leaf->flags, &second);
- if (err)
+ if (err) {
+ if(type == DISK_TYPE_NBD) {
+ vbd->nbd_mirror_failed = 1;
+ }
+
+ vbd->secondary=NULL;
+ vbd->secondary_mode=TD_VBD_SECONDARY_DISABLED;
+
goto fail;
+ }
+
if (second->info.size != leaf->info.size) {
EPRINTF("Secondary image size %"PRIu64" != image size %"PRIu64"\n",
second->info.size, leaf->info.size);
@@ -476,8 +497,13 @@ tapdisk_vbd_open_vdi(td_vbd_t *vbd, const char *name, td_flag_t flags, int prt_d
if (td_flag_test(vbd->flags, TD_OPEN_SECONDARY)) {
err = tapdisk_vbd_add_secondary(vbd);
- if (err)
+ if (err) {
+ if(vbd->nbd_mirror_failed != 1)
goto fail;
+
+ INFO("Ignoring failed NBD secondary attach\n");
+ err=0;
+ }
}
if (tmp != vbd->name)
@@ -520,6 +546,7 @@ tapdisk_vbd_attach(td_vbd_t *vbd, const char *devname, int minor)
return tapdisk_blktap_open(devname, vbd, &vbd->tap);
}
+/*
int