diff --git a/ChangeLog.rst b/ChangeLog.rst index cef97400..997feec7 100644 --- a/ChangeLog.rst +++ b/ChangeLog.rst @@ -1,3 +1,10 @@ +Unreleased Changes +------------------ + +* New max_conns option enables the use of multiple connections to improve responsiveness + during large file transfers. Thanks to Timo Savola for doing most of the implementation + work, and thanks to CEA.fr for sponsoring remaining bugfixes and cleanups! + Release 3.6.0 (2019-11-03) -------------------------- diff --git a/sshfs.c b/sshfs.c index 46612025..8cd89d38 100644 --- a/sshfs.c +++ b/sshfs.c @@ -138,16 +138,101 @@ #define MAX_PASSWORD 1024 +/* + Handling of multiple SFTP connections + -------------------------------------- + + An SFTP server is free to return responses to outstanding requests in arbitrary + order. However, execution of requests may only be re-ordered and parallelized as long + as "the results in the responses will be the same as if [the client] had sent the + requests one at a time and waited for the response in each case". + (https://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.1). + + When using multiple connections, this requirement applies independently for each + connection. We therefore have to make sure in SSHFS that the way in which we distribute + requests between connections does not affect the responses that we get. + + In general, this is a tricky problem to solve since for each incoming request we have + to determine which other in-flight requests may interact with it, and then either + transmit the request through the same connection or (if there are multiple connections + involved) wait for the other requests to complete. This means that e.g. a readdir + request would have to block on most other activity in the same directory, eliminating a + major advantage of using multiple connections. + + In practice, we can luckily take advantage of the knowledge that most FUSE requests are + the result of (synchronous) syscalls from userspace that will block until the + corresponding FUSE response has been sent. + + If -o sshfs_sync is used, SSHFS always waits for the SFTP server response before + returning a FUSE response to userspace. If userspace makes concurrent system calls, + there is no ordering guarantee in the first place, so we do not have to worry about + (re-)ordering within SSHFS either. + + For requests that originate in the kernel (rather than userspace), the situation is + slightly different. Transmission of FUSE requests and responses is decoupled (there are + no synchronous calls) and there is no formal specification that defines if reordering + is permitted. However, the Linux kernel seems to avoid submitting any concurrent + requests that would give different results depending on execution order and (as of + kernel 4.20 with writeback caching disabled) the only kind of kernel originated + requests are read() requests for read-ahead. Since libfuse internally uses multiple + threads, SSHFS does not necessarily receive requests in the order in which they were + sent by the kernel. Unless there is a major bug in FUSE, there is therefore no need to + worry about correct sequencing of such calls even when using multiple SFTP connections. + + If -o sshfs_sync is *not* used, then write() syscalls will return to userspace before + SSHFS has received responses from the SFTP server. If userspace then issues a second + syscall related to the same file (and only one connection is in-use), SFTP ordering + guarantees will ensure that the response takes into account the preceding writes. If + multiple connections are in use, this has to be ensured by SSHFS instead. + + The easiest way to do so would be to bind specific SFTP connections to file + handles. Unfortunately, not all requests for the same dentry are guaranteed to come + from the same file handle and some requests may come without any file handle. We + therefore maintain a separate mapping from currently open files to SFTP connections. If + a request comes in for a path contained in sshfs.conntab and its result could be + changed by a pending write() operation, it will always be executed with the + associated SFTP connection. + + There are additional subtleties for requests that affect multiple paths. For example, + if both source and destination of a rename() request are currently open, which + connection should be used? + + This problem is again hard in general, but solvable since we only have to worry about + the effects of pending write() calls. For rename() and link(), it does not matter if a + pending write is executed before or after the operation. For readdir(), it is possible + that a pending write() will change the length of the file. However, SSHFS currently + does not return attribute information for readdir(), so this does not pose problems + either. Should SSHFS implement a readdirplus() handler (which provides file names and + attributes) this is a problem that will need to be solved. +*/ + + #ifdef __APPLE__ static char sshfs_program_path[PATH_MAX] = { 0 }; #endif /* __APPLE__ */ +struct conn { + pthread_mutex_t lock_write; + int processing_thread_started; + int rfd; + int wfd; + int connver; + int req_count; + int dir_count; + int file_count; +}; + struct buffer { uint8_t *p; size_t len; size_t size; }; +struct dir_handle { + struct buffer buf; + struct conn *conn; +}; + struct list_head { struct list_head *prev; struct list_head *next; @@ -169,6 +254,7 @@ struct request { request_func end_func; size_t len; struct list_head list; + struct conn *conn; }; struct sshfs_io { @@ -202,6 +288,7 @@ struct sshfs_file { struct read_chunk *readahead; off_t next_pos; int is_seq; + struct conn *conn; int connver; int modifver; }; @@ -253,15 +340,14 @@ struct sshfs { char *host; char *base_path; GHashTable *reqtab; + GHashTable *conntab; pthread_mutex_t lock; - pthread_mutex_t lock_write; - int processing_thread_started; unsigned int randseed; - int rfd; - int wfd; + int max_conns; + struct conn *conns; int ptyfd; int ptyslavefd; - int connver; + int connvers; int server_version; unsigned remote_uid; unsigned local_uid; @@ -411,6 +497,7 @@ static struct fuse_opt sshfs_opts[] = { SSHFS_OPT("dir_cache=yes", dir_cache, 1), SSHFS_OPT("dir_cache=no", dir_cache, 0), SSHFS_OPT("direct_io", direct_io, 1), + SSHFS_OPT("max_conns=%u", max_conns, 1), SSHFS_OPT("-h", show_help, 1), SSHFS_OPT("--help", show_help, 1), @@ -904,7 +991,7 @@ static void ssh_add_arg(const char *arg) } -static int pty_expect_loop(void) +static int pty_expect_loop(struct conn *conn) { int res; char buf[256]; @@ -917,7 +1004,7 @@ static int pty_expect_loop(void) while (1) { struct pollfd fds[2]; - fds[0].fd = sshfs.rfd; + fds[0].fd = conn->rfd; fds[0].events = POLLIN; fds[1].fd = sshfs.ptyfd; fds[1].events = POLLIN; @@ -972,6 +1059,41 @@ static int pty_expect_loop(void) return 0; } +static struct conn* get_conn(const struct sshfs_file *sf, + const char *path) +{ + struct conn* conn; + + if (sshfs.max_conns == 1) + return &sshfs.conns[0]; + + if (sf != NULL) + return sf->conn; + + if (path != NULL) { + pthread_mutex_lock(&sshfs.lock); + conn = g_hash_table_lookup(sshfs.conntab, path); + pthread_mutex_unlock(&sshfs.lock); + + if (conn != NULL) + return conn; + } + + int best_index = 0; + uint64_t best_score = ~0ULL; /* smaller is better */ + for (int i = 0; i < sshfs.max_conns; i++) { + uint64_t score = ((uint64_t) sshfs.conns[i].req_count << 43) + + ((uint64_t) sshfs.conns[i].dir_count << 22) + + ((uint64_t) sshfs.conns[i].file_count << 1) + + (uint64_t) (sshfs.conns[i].rfd >= 0 ? 0 : 1); + if (score < best_score) { + best_index = i; + best_score = score; + } + } + return &sshfs.conns[best_index]; +} + static int pty_master(char **name) { int mfd; @@ -1004,7 +1126,7 @@ static void replace_arg(char **argp, const char *newarg) } } -static int start_ssh(void) +static int start_ssh(struct conn *conn) { char *ptyname = NULL; int sockpair[2]; @@ -1025,8 +1147,8 @@ static int start_ssh(void) perror("failed to create socket pair"); return -1; } - sshfs.rfd = sockpair[0]; - sshfs.wfd = sockpair[0]; + conn->rfd = sockpair[0]; + conn->wfd = sockpair[0]; pid = fork(); if (pid == -1) { @@ -1094,14 +1216,14 @@ static int start_ssh(void) return 0; } -static int connect_slave() +static int connect_slave(struct conn *conn) { - sshfs.rfd = STDIN_FILENO; - sshfs.wfd = STDOUT_FILENO; + conn->rfd = STDIN_FILENO; + conn->wfd = STDOUT_FILENO; return 0; } -static int connect_to(char *host, char *port) +static int connect_to(struct conn *conn, char *host, char *port) { int err; int sock; @@ -1138,16 +1260,16 @@ static int connect_to(char *host, char *port) freeaddrinfo(ai); - sshfs.rfd = sock; - sshfs.wfd = sock; + conn->rfd = sock; + conn->wfd = sock; return 0; } -static int do_write(struct iovec *iov, size_t count) +static int do_write(struct conn *conn, struct iovec *iov, size_t count) { int res; while (count) { - res = writev(sshfs.wfd, iov, count); + res = writev(conn->wfd, iov, count); if (res == -1) { perror("write"); return -1; @@ -1194,8 +1316,8 @@ static size_t iov_length(const struct iovec *iov, unsigned long nr_segs) #define SFTP_MAX_IOV 3 -static int sftp_send_iov(uint8_t type, uint32_t id, struct iovec iov[], - size_t count) +static int sftp_send_iov(struct conn *conn, uint8_t type, uint32_t id, + struct iovec iov[], size_t count) { int res; struct buffer buf; @@ -1211,20 +1333,20 @@ static int sftp_send_iov(uint8_t type, uint32_t id, struct iovec iov[], buf_to_iov(&buf, &iovout[nout++]); for (i = 0; i < count; i++) iovout[nout++] = iov[i]; - pthread_mutex_lock(&sshfs.lock_write); - res = do_write(iovout, nout); - pthread_mutex_unlock(&sshfs.lock_write); + pthread_mutex_lock(&conn->lock_write); + res = do_write(conn, iovout, nout); + pthread_mutex_unlock(&conn->lock_write); buf_free(&buf); return res; } -static int do_read(struct buffer *buf) +static int do_read(struct conn *conn, struct buffer *buf) { int res; uint8_t *p = buf->p; size_t size = buf->size; while (size) { - res = read(sshfs.rfd, p, size); + res = read(conn->rfd, p, size); if (res == -1) { perror("read"); return -1; @@ -1238,13 +1360,13 @@ static int do_read(struct buffer *buf) return 0; } -static int sftp_read(uint8_t *type, struct buffer *buf) +static int sftp_read(struct conn *conn, uint8_t *type, struct buffer *buf) { int res; struct buffer buf2; uint32_t len; buf_init(&buf2, 5); - res = do_read(&buf2); + res = do_read(conn, &buf2); if (res != -1) { if (buf_get_uint32(&buf2, &len) == -1) return -1; @@ -1255,7 +1377,7 @@ static int sftp_read(uint8_t *type, struct buffer *buf) if (buf_get_uint8(&buf2, type) == -1) return -1; buf_init(buf, len - 1); - res = do_read(buf); + res = do_read(conn, buf); } buf_free(&buf2); return res; @@ -1263,6 +1385,7 @@ static int sftp_read(uint8_t *type, struct buffer *buf) static void request_free(struct request *req) { + req->conn->req_count--; buf_free(&req->reply); sem_destroy(&req->ready); g_free(req); @@ -1297,10 +1420,13 @@ static void chunk_put_locked(struct read_chunk *chunk) pthread_mutex_unlock(&sshfs.lock); } -static int clean_req(void *key_, struct request *req, gpointer user_data_) +static int clean_req(void *key, struct request *req, gpointer user_data) { - (void) key_; - (void) user_data_; + (void) key; + struct conn* conn = (struct conn*) user_data; + + if (req->conn != conn) + return FALSE; req->error = -EIO; if (req->want_reply) @@ -1313,7 +1439,7 @@ static int clean_req(void *key_, struct request *req, gpointer user_data_) return TRUE; } -static int process_one_request(void) +static int process_one_request(struct conn *conn) { int res; struct buffer buf; @@ -1322,7 +1448,7 @@ static int process_one_request(void) uint32_t id; buf_init(&buf, 0); - res = sftp_read(&type, &buf); + res = sftp_read(conn, &type, &buf); if (res == -1) return -1; if (buf_get_uint32(&buf, &id) == -1) @@ -1384,13 +1510,13 @@ static int process_one_request(void) return 0; } -static void close_conn(void) +static void close_conn(struct conn *conn) { - close(sshfs.rfd); - if (sshfs.rfd != sshfs.wfd) - close(sshfs.wfd); - sshfs.rfd = -1; - sshfs.wfd = -1; + close(conn->rfd); + if (conn->rfd != conn->wfd) + close(conn->wfd); + conn->rfd = -1; + conn->wfd = -1; if (sshfs.ptyfd != -1) { close(sshfs.ptyfd); sshfs.ptyfd = -1; @@ -1404,17 +1530,18 @@ static void close_conn(void) static void *process_requests(void *data_) { (void) data_; + struct conn *conn = data_; while (1) { - if (process_one_request() == -1) + if (process_one_request(conn) == -1) break; } pthread_mutex_lock(&sshfs.lock); - sshfs.processing_thread_started = 0; - close_conn(); - g_hash_table_foreach_remove(sshfs.reqtab, (GHRFunc) clean_req, NULL); - sshfs.connver ++; + conn->processing_thread_started = 0; + close_conn(conn); + g_hash_table_foreach_remove(sshfs.reqtab, (GHRFunc) clean_req, conn); + conn->connver = ++sshfs.connvers; sshfs.outstanding_len = 0; pthread_cond_broadcast(&sshfs.outstanding_cond); pthread_mutex_unlock(&sshfs.lock); @@ -1426,7 +1553,8 @@ static void *process_requests(void *data_) return NULL; } -static int sftp_init_reply_ok(struct buffer *buf, uint32_t *version) +static int sftp_init_reply_ok(struct conn *conn, struct buffer *buf, + uint32_t *version) { uint32_t len; uint8_t type; @@ -1452,7 +1580,7 @@ static int sftp_init_reply_ok(struct buffer *buf, uint32_t *version) struct buffer buf2; buf_init(&buf2, len - 5); - if (do_read(&buf2) == -1) { + if (do_read(conn, &buf2) == -1) { buf_free(&buf2); return -1; } @@ -1489,17 +1617,17 @@ static int sftp_init_reply_ok(struct buffer *buf, uint32_t *version) return 0; } -static int sftp_find_init_reply(uint32_t *version) +static int sftp_find_init_reply(struct conn *conn, uint32_t *version) { int res; struct buffer buf; buf_init(&buf, 9); - res = do_read(&buf); + res = do_read(conn, &buf); while (res != -1) { struct buffer buf2; - res = sftp_init_reply_ok(&buf, version); + res = sftp_init_reply_ok(conn, &buf, version); if (res <= 0) break; @@ -1509,25 +1637,25 @@ static int sftp_find_init_reply(uint32_t *version) buf.len = 0; buf2.p = buf.p + buf.size - 1; buf2.size = 1; - res = do_read(&buf2); + res = do_read(conn, &buf2); } buf_free(&buf); return res; } -static int sftp_init() +static int sftp_init(struct conn *conn) { int res = -1; uint32_t version = 0; struct buffer buf; buf_init(&buf, 0); - if (sftp_send_iov(SSH_FXP_INIT, PROTO_VERSION, NULL, 0) == -1) + if (sftp_send_iov(conn, SSH_FXP_INIT, PROTO_VERSION, NULL, 0) == -1) goto out; - if (sshfs.password_stdin && pty_expect_loop() == -1) + if (sshfs.password_stdin && pty_expect_loop(conn) == -1) goto out; - if (sftp_find_init_reply(&version) == -1) + if (sftp_find_init_reply(conn, &version) == -1) goto out; sshfs.server_version = version; @@ -1558,7 +1686,7 @@ static int sftp_error_to_errno(uint32_t error) } } -static void sftp_detect_uid() +static void sftp_detect_uid(struct conn *conn) { int flags; uint32_t id = sftp_get_id(); @@ -1571,10 +1699,10 @@ static void sftp_detect_uid() buf_init(&buf, 5); buf_add_string(&buf, "."); buf_to_iov(&buf, &iov[0]); - if (sftp_send_iov(SSH_FXP_STAT, id, iov, 1) == -1) + if (sftp_send_iov(conn, SSH_FXP_STAT, id, iov, 1) == -1) goto out; buf_clear(&buf); - if (sftp_read(&type, &buf) == -1) + if (sftp_read(conn, &type, &buf) == -1) goto out; if (type != SSH_FXP_ATTRS && type != SSH_FXP_STATUS) { fprintf(stderr, "protocol error\n"); @@ -1614,7 +1742,7 @@ static void sftp_detect_uid() buf_free(&buf); } -static int sftp_check_root(const char *base_path) +static int sftp_check_root(struct conn *conn, const char *base_path) { int flags; uint32_t id = sftp_get_id(); @@ -1629,10 +1757,10 @@ static int sftp_check_root(const char *base_path) buf_init(&buf, 0); buf_add_string(&buf, remote_dir); buf_to_iov(&buf, &iov[0]); - if (sftp_send_iov(SSH_FXP_LSTAT, id, iov, 1) == -1) + if (sftp_send_iov(conn, SSH_FXP_LSTAT, id, iov, 1) == -1) goto out; buf_clear(&buf); - if (sftp_read(&type, &buf) == -1) + if (sftp_read(conn, &type, &buf) == -1) goto out; if (type != SSH_FXP_ATTRS && type != SSH_FXP_STATUS) { fprintf(stderr, "protocol error\n"); @@ -1677,45 +1805,45 @@ static int sftp_check_root(const char *base_path) return err; } -static int connect_remote(void) +static int connect_remote(struct conn *conn) { int err; if (sshfs.slave) - err = connect_slave(); + err = connect_slave(conn); else if (sshfs.directport) - err = connect_to(sshfs.host, sshfs.directport); + err = connect_to(conn, sshfs.host, sshfs.directport); else - err = start_ssh(); + err = start_ssh(conn); if (!err) - err = sftp_init(); + err = sftp_init(conn); if (err) - close_conn(); + close_conn(conn); else sshfs.num_connect++; return err; } -static int start_processing_thread(void) +static int start_processing_thread(struct conn *conn) { int err; pthread_t thread_id; sigset_t oldset; sigset_t newset; - if (sshfs.processing_thread_started) + if (conn->processing_thread_started) return 0; - if (sshfs.rfd == -1) { - err = connect_remote(); + if (conn->rfd == -1) { + err = connect_remote(conn); if (err) return -EIO; } if (sshfs.detect_uid) { - sftp_detect_uid(); + sftp_detect_uid(conn); sshfs.detect_uid = 0; } @@ -1725,14 +1853,14 @@ static int start_processing_thread(void) sigaddset(&newset, SIGHUP); sigaddset(&newset, SIGQUIT); pthread_sigmask(SIG_BLOCK, &newset, &oldset); - err = pthread_create(&thread_id, NULL, process_requests, NULL); + err = pthread_create(&thread_id, NULL, process_requests, conn); if (err) { fprintf(stderr, "failed to create thread: %s\n", strerror(err)); return -EIO; } pthread_detach(thread_id); pthread_sigmask(SIG_SETMASK, &oldset, NULL); - sshfs.processing_thread_started = 1; + conn->processing_thread_started = 1; return 0; } @@ -1746,11 +1874,15 @@ static void *sshfs_init(struct fuse_conn_info *conn, // These workarounds require the "path" argument. cfg->nullpath_ok = !(sshfs.truncate_workaround || sshfs.fstat_workaround); + // When using multiple connections, release() needs to know the path + if (sshfs.max_conns > 1) + cfg->nullpath_ok = 0; + // Lookup of . and .. is supported conn->capable |= FUSE_CAP_EXPORT_SUPPORT; if (!sshfs.delay_connect) - start_processing_thread(); + start_processing_thread(&sshfs.conns[0]); // SFTP only supports 1-second time resolution conn->time_gran = 1000000000; @@ -1824,10 +1956,9 @@ static int sftp_request_wait(struct request *req, uint8_t type, return err; } -static int sftp_request_send(uint8_t type, struct iovec *iov, size_t count, - request_func begin_func, request_func end_func, - int want_reply, void *data, - struct request **reqp) +static int sftp_request_send(struct conn *conn, uint8_t type, struct iovec *iov, + size_t count, request_func begin_func, request_func end_func, + int want_reply, void *data, struct request **reqp) { int err; uint32_t id; @@ -1843,7 +1974,9 @@ static int sftp_request_send(uint8_t type, struct iovec *iov, size_t count, begin_func(req); id = sftp_get_id(); req->id = id; - err = start_processing_thread(); + req->conn = conn; + req->conn->req_count++; + err = start_processing_thread(conn); if (err) { pthread_mutex_unlock(&sshfs.lock); goto out; @@ -1863,7 +1996,7 @@ static int sftp_request_send(uint8_t type, struct iovec *iov, size_t count, pthread_mutex_unlock(&sshfs.lock); err = -EIO; - if (sftp_send_iov(type, id, iov, count) == -1) { + if (sftp_send_iov(conn, type, id, iov, count) == -1) { gboolean rmed; pthread_mutex_lock(&sshfs.lock); @@ -1890,28 +2023,27 @@ static int sftp_request_send(uint8_t type, struct iovec *iov, size_t count, return err; } - -static int sftp_request_iov(uint8_t type, struct iovec *iov, size_t count, - uint8_t expect_type, struct buffer *outbuf) +static int sftp_request_iov(struct conn *conn, uint8_t type, struct iovec *iov, + size_t count, uint8_t expect_type, struct buffer *outbuf) { int err; struct request *req; - err = sftp_request_send(type, iov, count, NULL, NULL, expect_type, NULL, - &req); + err = sftp_request_send(conn, type, iov, count, NULL, NULL, + expect_type, NULL, &req); if (expect_type == 0) return err; return sftp_request_wait(req, type, expect_type, outbuf); } -static int sftp_request(uint8_t type, const struct buffer *buf, - uint8_t expect_type, struct buffer *outbuf) +static int sftp_request(struct conn *conn, uint8_t type, const struct buffer *buf, + uint8_t expect_type, struct buffer *outbuf) { struct iovec iov; buf_to_iov(buf, &iov); - return sftp_request_iov(type, &iov, 1, expect_type, outbuf); + return sftp_request_iov(conn, type, &iov, 1, expect_type, outbuf); } static int sshfs_access(const char *path, int mask) @@ -2009,7 +2141,8 @@ static int sshfs_readlink(const char *path, char *linkbuf, size_t size) buf_init(&buf, 0); buf_add_path(&buf, path); - err = sftp_request(SSH_FXP_READLINK, &buf, SSH_FXP_NAME, &name); + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_READLINK, &buf, SSH_FXP_NAME, &name); if (!err) { uint32_t count; char *link; @@ -2029,12 +2162,13 @@ static int sshfs_readlink(const char *path, char *linkbuf, size_t size) return err; } -static int sftp_readdir_send(struct request **req, struct buffer *handle) +static int sftp_readdir_send(struct conn *conn, struct request **req, + struct buffer *handle) { struct iovec iov; buf_to_iov(handle, &iov); - return sftp_request_send(SSH_FXP_READDIR, &iov, 1, NULL, NULL, + return sftp_request_send(conn, SSH_FXP_READDIR, &iov, 1, NULL, NULL, SSH_FXP_NAME, NULL, req); } @@ -2046,8 +2180,8 @@ static int sshfs_req_pending(struct request *req) return 0; } -static int sftp_readdir_async(struct buffer *handle, void *buf, off_t offset, - fuse_fill_dir_t filler) +static int sftp_readdir_async(struct conn *conn, struct buffer *handle, + void *buf, off_t offset, fuse_fill_dir_t filler) { int err = 0; int outstanding = 0; @@ -2063,7 +2197,7 @@ static int sftp_readdir_async(struct buffer *handle, void *buf, off_t offset, int tmperr; while (!done && outstanding < max) { - tmperr = sftp_readdir_send(&req, handle); + tmperr = sftp_readdir_send(conn, &req, handle); if (tmperr && !done) { err = tmperr; @@ -2124,14 +2258,14 @@ static int sftp_readdir_async(struct buffer *handle, void *buf, off_t offset, return err; } -static int sftp_readdir_sync(struct buffer *handle, void *buf, off_t offset, - fuse_fill_dir_t filler) +static int sftp_readdir_sync(struct conn *conn, struct buffer *handle, + void *buf, off_t offset, fuse_fill_dir_t filler) { int err; assert(offset == 0); do { struct buffer name; - err = sftp_request(SSH_FXP_READDIR, handle, SSH_FXP_NAME, &name); + err = sftp_request(conn, SSH_FXP_READDIR, handle, SSH_FXP_NAME, &name); if (!err) { err = buf_get_entries(&name, buf, filler); buf_free(&name); @@ -2146,21 +2280,28 @@ static int sftp_readdir_sync(struct buffer *handle, void *buf, off_t offset, static int sshfs_opendir(const char *path, struct fuse_file_info *fi) { int err; + struct conn *conn; struct buffer buf; - struct buffer *handle; + struct dir_handle *handle; - handle = malloc(sizeof(struct buffer)); + handle = g_new0(struct dir_handle, 1); if(handle == NULL) return -ENOMEM; + // Commutes with pending write(), so we can use any connection + conn = get_conn(NULL, NULL); buf_init(&buf, 0); buf_add_path(&buf, path); - err = sftp_request(SSH_FXP_OPENDIR, &buf, SSH_FXP_HANDLE, handle); + err = sftp_request(conn, SSH_FXP_OPENDIR, &buf, SSH_FXP_HANDLE, &handle->buf); if (!err) { - buf_finish(handle); + buf_finish(&handle->buf); + pthread_mutex_lock(&sshfs.lock); + handle->conn = conn; + handle->conn->dir_count++; + pthread_mutex_unlock(&sshfs.lock); fi->fh = (unsigned long) handle; } else - free(handle); + g_free(handle); buf_free(&buf); return err; } @@ -2171,14 +2312,16 @@ static int sshfs_readdir(const char *path, void *dbuf, fuse_fill_dir_t filler, { (void) path; (void) flags; int err; - struct buffer *handle; + struct dir_handle *handle; - handle = (struct buffer*) fi->fh; + handle = (struct dir_handle*) fi->fh; if (sshfs.sync_readdir) - err = sftp_readdir_sync(handle, dbuf, offset, filler); + err = sftp_readdir_sync(handle->conn, &handle->buf, dbuf, + offset, filler); else - err = sftp_readdir_async(handle, dbuf, offset, filler); + err = sftp_readdir_async(handle->conn, &handle->buf, dbuf, + offset, filler); return err; } @@ -2187,12 +2330,15 @@ static int sshfs_releasedir(const char *path, struct fuse_file_info *fi) { (void) path; int err; - struct buffer *handle; + struct dir_handle *handle; - handle = (struct buffer*) fi->fh; - err = sftp_request(SSH_FXP_CLOSE, handle, 0, NULL); - buf_free(handle); - free(handle); + handle = (struct dir_handle*) fi->fh; + err = sftp_request(handle->conn, SSH_FXP_CLOSE, &handle->buf, 0, NULL); + pthread_mutex_lock(&sshfs.lock); + handle->conn->dir_count--; + pthread_mutex_unlock(&sshfs.lock); + buf_free(&handle->buf); + g_free(handle); return err; } @@ -2205,7 +2351,8 @@ static int sshfs_mkdir(const char *path, mode_t mode) buf_add_path(&buf, path); buf_add_uint32(&buf, SSH_FILEXFER_ATTR_PERMISSIONS); buf_add_uint32(&buf, mode); - err = sftp_request(SSH_FXP_MKDIR, &buf, SSH_FXP_STATUS, NULL); + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_MKDIR, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; } @@ -2213,6 +2360,7 @@ static int sshfs_mkdir(const char *path, mode_t mode) static int sshfs_mknod(const char *path, mode_t mode, dev_t rdev) { int err; + struct conn *conn; struct buffer buf; struct buffer handle; (void) rdev; @@ -2220,17 +2368,19 @@ static int sshfs_mknod(const char *path, mode_t mode, dev_t rdev) if ((mode & S_IFMT) != S_IFREG) return -EPERM; + // Commutes with pending write(), so we can use any connection + conn = get_conn(NULL, NULL); + buf_init(&buf, 0); buf_add_path(&buf, path); buf_add_uint32(&buf, SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_EXCL); buf_add_uint32(&buf, SSH_FILEXFER_ATTR_PERMISSIONS); buf_add_uint32(&buf, mode); - err = sftp_request(SSH_FXP_OPEN, &buf, SSH_FXP_HANDLE, &handle); + err = sftp_request(conn, SSH_FXP_OPEN, &buf, SSH_FXP_HANDLE, &handle); if (!err) { int err2; buf_finish(&handle); - err2 = sftp_request(SSH_FXP_CLOSE, &handle, SSH_FXP_STATUS, - NULL); + err2 = sftp_request(conn, SSH_FXP_CLOSE, &handle, SSH_FXP_STATUS, NULL); if (!err) err = err2; buf_free(&handle); @@ -2252,7 +2402,8 @@ static int sshfs_symlink(const char *from, const char *to) buf_init(&buf, 0); buf_add_string(&buf, from); buf_add_path(&buf, to); - err = sftp_request(SSH_FXP_SYMLINK, &buf, SSH_FXP_STATUS, NULL); + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_SYMLINK, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; } @@ -2263,7 +2414,8 @@ static int sshfs_unlink(const char *path) struct buffer buf; buf_init(&buf, 0); buf_add_path(&buf, path); - err = sftp_request(SSH_FXP_REMOVE, &buf, SSH_FXP_STATUS, NULL); + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_REMOVE, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; } @@ -2274,7 +2426,8 @@ static int sshfs_rmdir(const char *path) struct buffer buf; buf_init(&buf, 0); buf_add_path(&buf, path); - err = sftp_request(SSH_FXP_RMDIR, &buf, SSH_FXP_STATUS, NULL); + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_RMDIR, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; } @@ -2286,7 +2439,8 @@ static int sshfs_do_rename(const char *from, const char *to) buf_init(&buf, 0); buf_add_path(&buf, from); buf_add_path(&buf, to); - err = sftp_request(SSH_FXP_RENAME, &buf, SSH_FXP_STATUS, NULL); + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_RENAME, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; } @@ -2299,7 +2453,8 @@ static int sshfs_ext_posix_rename(const char *from, const char *to) buf_add_string(&buf, SFTP_EXT_POSIX_RENAME); buf_add_path(&buf, from); buf_add_path(&buf, to); - err = sftp_request(SSH_FXP_EXTENDED, &buf, SSH_FXP_STATUS, NULL); + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_EXTENDED, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; } @@ -2318,7 +2473,7 @@ static int sshfs_rename(const char *from, const char *to, unsigned int flags) if(flags != 0) return -EINVAL; - + if (sshfs.ext_posix_rename) err = sshfs_ext_posix_rename(from, to); else @@ -2342,6 +2497,15 @@ static int sshfs_rename(const char *from, const char *to, unsigned int flags) } if (err == -EPERM && sshfs.renamexdev_workaround) err = -EXDEV; + + if (!err && sshfs.max_conns > 1) { + void *conn = g_hash_table_lookup(sshfs.conntab, from); + if (conn != NULL) { + g_hash_table_replace(sshfs.conntab, g_strdup(to), conn); + g_hash_table_remove(sshfs.conntab, from); + } + } + return err; } @@ -2356,7 +2520,8 @@ static int sshfs_link(const char *from, const char *to) buf_add_string(&buf, SFTP_EXT_HARDLINK); buf_add_path(&buf, from); buf_add_path(&buf, to); - err = sftp_request(SSH_FXP_EXTENDED, &buf, SSH_FXP_STATUS, + // Commutes with pending write(), so we can use any connection + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_EXTENDED, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); } @@ -2369,7 +2534,7 @@ static inline int sshfs_file_is_conn(struct sshfs_file *sf) int ret; pthread_mutex_lock(&sshfs.lock); - ret = (sf->connver == sshfs.connver); + ret = (sf->connver == sf->conn->connver); pthread_mutex_unlock(&sshfs.lock); return ret; @@ -2404,7 +2569,10 @@ static int sshfs_chmod(const char *path, mode_t mode, buf_add_uint32(&buf, mode); /* FIXME: really needs LSETSTAT extension (debian Bug#640038) */ - err = sftp_request(sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, + // Commutes with pending write(), so we can use any connection + // if the file is not open. + err = sftp_request(get_conn(sf, NULL), + sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; @@ -2446,7 +2614,10 @@ static int sshfs_chown(const char *path, uid_t uid, gid_t gid, buf_add_uint32(&buf, uid); buf_add_uint32(&buf, gid); - err = sftp_request(sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, + // Commutes with pending write(), so we can use any connection + // if the file is not open. + err = sftp_request(get_conn(sf, NULL), + sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; @@ -2493,7 +2664,8 @@ static int sshfs_utimens(const char *path, const struct timespec tv[2], buf_add_uint32(&buf, asec); buf_add_uint32(&buf, msec); - err = sftp_request(sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, + err = sftp_request(get_conn(sf, path), + sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; @@ -2549,7 +2721,19 @@ static int sshfs_open_common(const char *path, mode_t mode, sf->next_pos = 0; pthread_mutex_lock(&sshfs.lock); sf->modifver= sshfs.modifver; - sf->connver = sshfs.connver; + if (sshfs.max_conns > 1) { + sf->conn = g_hash_table_lookup(sshfs.conntab, path); + if (!sf->conn) { + sf->conn = get_conn(NULL, NULL); + g_hash_table_insert(sshfs.conntab, g_strdup(path), sf->conn); + } else { + assert(sf->conn->file_count > 0); + } + sf->conn->file_count++; + } else { + sf->conn = &sshfs.conns[0]; + } + sf->connver = sf->conn->connver; pthread_mutex_unlock(&sshfs.lock); buf_init(&buf, 0); buf_add_path(&buf, path); @@ -2557,12 +2741,12 @@ static int sshfs_open_common(const char *path, mode_t mode, buf_add_uint32(&buf, SSH_FILEXFER_ATTR_PERMISSIONS); buf_add_uint32(&buf, mode); buf_to_iov(&buf, &iov); - sftp_request_send(SSH_FXP_OPEN, &iov, 1, NULL, NULL, 1, NULL, + sftp_request_send(sf->conn, SSH_FXP_OPEN, &iov, 1, NULL, NULL, 1, NULL, &open_req); buf_clear(&buf); buf_add_path(&buf, path); type = sshfs.follow_symlinks ? SSH_FXP_STAT : SSH_FXP_LSTAT; - err2 = sftp_request(type, &buf, SSH_FXP_ATTRS, &outbuf); + err2 = sftp_request(sf->conn, type, &buf, SSH_FXP_ATTRS, &outbuf); if (!err2) { err2 = buf_get_attrs(&outbuf, &stbuf, NULL); buf_free(&outbuf); @@ -2571,7 +2755,7 @@ static int sshfs_open_common(const char *path, mode_t mode, &sf->handle); if (!err && err2) { buf_finish(&sf->handle); - sftp_request(SSH_FXP_CLOSE, &sf->handle, 0, NULL); + sftp_request(sf->conn, SSH_FXP_CLOSE, &sf->handle, 0, NULL); buf_free(&sf->handle); err = err2; } @@ -2584,6 +2768,13 @@ static int sshfs_open_common(const char *path, mode_t mode, } else { if (sshfs.dir_cache) cache_invalidate(path); + if (sshfs.max_conns > 1) { + pthread_mutex_lock(&sshfs.lock); + if(--sf->conn->file_count == 0) { + g_hash_table_remove(sshfs.conntab, path); + } + pthread_mutex_unlock(&sshfs.lock); + } g_free(sf); } buf_free(&buf); @@ -2642,7 +2833,7 @@ static int sshfs_fsync(const char *path, int isdatasync, buf_init(&buf, 0); buf_add_string(&buf, SFTP_EXT_FSYNC); buf_add_buf(&buf, &sf->handle); - err = sftp_request(SSH_FXP_EXTENDED, &buf, SSH_FXP_STATUS, NULL); + err = sftp_request(sf->conn, SSH_FXP_EXTENDED, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); return err; } @@ -2653,10 +2844,16 @@ static int sshfs_release(const char *path, struct fuse_file_info *fi) struct buffer *handle = &sf->handle; if (sshfs_file_is_conn(sf)) { sshfs_flush(path, fi); - sftp_request(SSH_FXP_CLOSE, handle, 0, NULL); + sftp_request(sf->conn, SSH_FXP_CLOSE, handle, 0, NULL); } buf_free(handle); chunk_put_locked(sf->readahead); + if (sshfs.max_conns > 1) { + sf->conn->file_count--; + if(!sf->conn->file_count) + g_hash_table_remove(sshfs.conntab, path); + pthread_mutex_unlock(&sshfs.lock); + } g_free(sf); return 0; } @@ -2736,7 +2933,7 @@ static struct read_chunk *sshfs_send_read(struct sshfs_file *sf, size_t size, buf_add_uint64(&buf, offset); buf_add_uint32(&buf, bsize); buf_to_iov(&buf, &iov[0]); - err = sftp_request_send(SSH_FXP_READ, iov, 1, + err = sftp_request_send(sf->conn, SSH_FXP_READ, iov, 1, sshfs_read_begin, sshfs_read_end, 0, rreq, NULL); @@ -2951,7 +3148,7 @@ static int sshfs_async_write(struct sshfs_file *sf, const char *wbuf, buf_to_iov(&buf, &iov[0]); iov[1].iov_base = (void *) wbuf; iov[1].iov_len = bsize; - err = sftp_request_send(SSH_FXP_WRITE, iov, 2, + err = sftp_request_send(sf->conn, SSH_FXP_WRITE, iov, 2, sshfs_write_begin, sshfs_write_end, 0, sf, NULL); buf_free(&buf); @@ -3011,7 +3208,7 @@ static int sshfs_sync_write(struct sshfs_file *sf, const char *wbuf, buf_to_iov(&buf, &iov[0]); iov[1].iov_base = (void *) wbuf; iov[1].iov_len = bsize; - err = sftp_request_send(SSH_FXP_WRITE, iov, 2, + err = sftp_request_send(sf->conn, SSH_FXP_WRITE, iov, 2, sshfs_sync_write_begin, sshfs_sync_write_end, 0, &sio, NULL); @@ -3061,8 +3258,8 @@ static int sshfs_ext_statvfs(const char *path, struct statvfs *stbuf) buf_init(&buf, 0); buf_add_string(&buf, SFTP_EXT_STATVFS); buf_add_path(&buf, path); - err = sftp_request(SSH_FXP_EXTENDED, &buf, SSH_FXP_EXTENDED_REPLY, - &outbuf); + err = sftp_request(get_conn(NULL, NULL), SSH_FXP_EXTENDED, &buf, + SSH_FXP_EXTENDED_REPLY, &outbuf); if (!err) { if (buf_get_statvfs(&outbuf, stbuf) == -1) err = -EIO; @@ -3112,7 +3309,7 @@ static int sshfs_truncate(const char *path, off_t size, if (!sshfs_file_is_conn(sf)) return -EIO; } - + sshfs_inc_modifver(); if (sshfs.truncate_workaround) return sshfs_truncate_workaround(path, size, fi); @@ -3126,7 +3323,8 @@ static int sshfs_truncate(const char *path, off_t size, buf_add_uint32(&buf, SSH_FILEXFER_ATTR_SIZE); buf_add_uint64(&buf, size); - err = sftp_request(sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, + err = sftp_request(get_conn(sf, path), + sf == NULL ? SSH_FXP_SETSTAT : SSH_FXP_FSETSTAT, &buf, SSH_FXP_STATUS, NULL); buf_free(&buf); @@ -3146,17 +3344,18 @@ static int sshfs_getattr(const char *path, struct stat *stbuf, if (!sshfs_file_is_conn(sf)) return -EIO; } - buf_init(&buf, 0); if(sf == NULL) { buf_add_path(&buf, path); - err = sftp_request(sshfs.follow_symlinks ? SSH_FXP_STAT : SSH_FXP_LSTAT, + err = sftp_request(get_conn(sf, path), + sshfs.follow_symlinks ? SSH_FXP_STAT : SSH_FXP_LSTAT, &buf, SSH_FXP_ATTRS, &outbuf); } else { buf_add_buf(&buf, &sf->handle); - err = sftp_request(SSH_FXP_FSTAT, &buf, SSH_FXP_ATTRS, &outbuf); + err = sftp_request(sf->conn, SSH_FXP_FSTAT, &buf, + SSH_FXP_ATTRS, &outbuf); } if (!err) { err = buf_get_attrs(&outbuf, stbuf, NULL); @@ -3290,16 +3489,27 @@ static int sshfs_truncate_workaround(const char *path, off_t size, static int processing_init(void) { + int i; + signal(SIGPIPE, SIG_IGN); pthread_mutex_init(&sshfs.lock, NULL); - pthread_mutex_init(&sshfs.lock_write, NULL); + for (i = 0; i < sshfs.max_conns; i++) + pthread_mutex_init(&sshfs.conns[i].lock_write, NULL); pthread_cond_init(&sshfs.outstanding_cond, NULL); sshfs.reqtab = g_hash_table_new(NULL, NULL); if (!sshfs.reqtab) { fprintf(stderr, "failed to create hash table\n"); return -1; } + if (sshfs.max_conns > 1) { + sshfs.conntab = g_hash_table_new_full(g_str_hash, g_str_equal, + g_free, NULL); + if (!sshfs.conntab) { + fprintf(stderr, "failed to create hash table\n"); + return -1; + } + } return 0; } @@ -3393,6 +3603,7 @@ static void usage(const char *progname) " -o follow_symlinks follow symlinks on the server\n" " -o no_check_root don't check for existence of 'dir' on server\n" " -o password_stdin read password from stdin (only for pam_mount!)\n" +" -o max_conns=N open parallel SSH connections\n" " -o SSHOPT=VAL ssh options (see man ssh_config)\n" "\n" "FUSE Options:\n", @@ -3706,11 +3917,11 @@ static int ssh_connect(void) return -1; if (!sshfs.delay_connect) { - if (connect_remote() == -1) + if (connect_remote(&sshfs.conns[0]) == -1) return -1; if (!sshfs.no_check_root && - sftp_check_root(sshfs.base_path) != 0) + sftp_check_root(&sshfs.conns[0], sshfs.base_path) != 0) return -1; } @@ -3905,6 +4116,7 @@ int main(int argc, char *argv[]) const char *sftp_server; struct fuse *fuse; struct fuse_session *se; + int i; #ifdef __APPLE__ if (!realpath(*exec_path, sshfs_program_path)) { @@ -3931,8 +4143,7 @@ int main(int argc, char *argv[]) sshfs.createmode_workaround = 0; sshfs.ssh_ver = 2; sshfs.progname = argv[0]; - sshfs.rfd = -1; - sshfs.wfd = -1; + sshfs.max_conns = 1; sshfs.ptyfd = -1; sshfs.dir_cache = 1; sshfs.show_help = 0; @@ -4024,7 +4235,7 @@ int main(int argc, char *argv[]) if (sshfs.debug) sshfs.foreground = 1; - + if (sshfs.buflimit_workaround) /* Work around buggy sftp-server in OpenSSH. Without this on a slow server a 10Mbyte buffer would fill up and the server @@ -4033,6 +4244,32 @@ int main(int argc, char *argv[]) else sshfs.max_outstanding_len = ~0; + if (sshfs.max_conns > 1) { + if (sshfs.buflimit_workaround) { + fprintf(stderr, "buflimit workaround is not supported with parallel connections\n"); + exit(1); + } + + if (sshfs.password_stdin) { + fprintf(stderr, "password_stdin option cannot be specified with parallel connections\n"); + exit(1); + } + + if (sshfs.slave) { + fprintf(stderr, "slave option cannot be specified with parallel connections\n"); + exit(1); + } + } else if (sshfs.max_conns <= 0) { + fprintf(stderr, "value of max_conns option must be at least 1\n"); + exit(1); + } + + sshfs.conns = g_new0(struct conn, sshfs.max_conns); + for (i = 0; i < sshfs.max_conns; i++) { + sshfs.conns[i].rfd = -1; + sshfs.conns[i].wfd = -1; + } + fsname = g_strdup(sshfs.host); sshfs.base_path = g_strdup(find_base_path()); diff --git a/sshfs.rst b/sshfs.rst index e3222f12..0fe84f29 100644 --- a/sshfs.rst +++ b/sshfs.rst @@ -218,6 +218,14 @@ Options for example if the file size is not known in advance (before reading it). e.g. /proc filesystem +-o max_conns=N + sets the maximum number of simultaneous SSH connections + to use. Each connection is established with a separate SSH process. + The primary purpose of this feature is to improve the responsiveness of the + file system during large file transfers. When using more than once + connection, the *password_stdin* and *slave* options can not be + used, and the *buflimit* workaround is not supported/ + In addition, SSHFS accepts several options common to all FUSE file systems. These are described in the `mount.fuse` manpage (look for "general", "libfuse specific", and "high-level API" options). diff --git a/test/test_sshfs.py b/test/test_sshfs.py index 995ff00c..d873a63a 100755 --- a/test/test_sshfs.py +++ b/test/test_sshfs.py @@ -33,7 +33,8 @@ def name_generator(__ctr=[0]): @pytest.mark.parametrize("debug", (False, True)) @pytest.mark.parametrize("cache_timeout", (0,1)) @pytest.mark.parametrize("sync_rd", (True, False)) -def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, capfd): +@pytest.mark.parametrize("multiconn", (True,False)) +def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, multiconn, capfd): # Avoid false positives from debug messages #if debug: @@ -77,6 +78,9 @@ def test_sshfs(tmpdir, debug, cache_timeout, sync_rd, capfd): cmdline += [ '-o', 'entry_timeout=0', '-o', 'attr_timeout=0' ] + if multiconn: + cmdline += [ '-o', 'max_conns=3', + '-o', 'workaround=nobuflimit' ] new_env = dict(os.environ) # copy, don't modify