Skip to content

Commit

Permalink
add -L limit bitrate option (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
upa committed Apr 10, 2024
1 parent 9b8ba69 commit 4caaaf2
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 25 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ list(APPEND MSCP_BUILD_INCLUDE_DIRS ${CMAKE_CURRENT_BINARY_DIR}/include)
# libmscp.a
set(LIBMSCP_SRC
src/mscp.c src/ssh.c src/fileops.c src/path.c src/checkpoint.c
src/platform.c src/print.c src/pool.c src/strerrno.c
src/bwlimit.c src/platform.c src/print.c src/pool.c src/strerrno.c
${OPENBSD_COMPAT_SRC})
add_library(mscp-static STATIC ${LIBMSCP_SRC})
target_include_directories(mscp-static
Expand Down Expand Up @@ -203,7 +203,7 @@ foreach(x RANGE ${DIST_LISTLEN})
COMMENT "Test mscp in ${DOCKER_IMAGE} container"
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
COMMAND
${CE} run --init --rm --sysctl net.ipv6.conf.all.disable_ipv6=0
${CE} run --init --rm -it --sysctl net.ipv6.conf.all.disable_ipv6=0
${DOCKER_IMAGE} /mscp/scripts/test-in-container.sh)

list(APPEND DOCKER_BUILDS docker-build-${DOCKER_INDEX})
Expand Down
8 changes: 8 additions & 0 deletions doc/mscp.1.in
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ mscp \- copy files over multiple SSH connections
.BI \-b \ BUF_SIZE\c
]
[\c
.BI \-L \ LIMIT_BITRATE\c
]
[\c
.BI \-l \ LOGIN_NAME\c
]
[\c
Expand Down Expand Up @@ -198,6 +201,11 @@ Specifies the buffer size for I/O and transfer over SFTP. The default
value is 16384. Note that the SSH specification restricts buffer size
delivered over SSH. Changing this value is not recommended at present.

.TP
.B \-L \fILIMIT_BITRATE\fR
Limits the bitrate, specified with k (K), m (M), and g (G), e.g., 100m
indicates 100 Mbps.

.TP
.B \-4
Uses IPv4 addresses only.
Expand Down
22 changes: 13 additions & 9 deletions doc/mscp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
MSCP
====

:Date: v0.1.4-28-g0d248c5
:Date: v0.1.5-4-g9b8ba69

NAME
====
Expand All @@ -12,14 +12,14 @@ mscp - copy files over multiple SSH connections
SYNOPSIS
========

**mscp** [**-46vqDpHdNh**] [ **-n**\ *NR_CONNECTIONS* ] [
**-m**\ *COREMASK* ] [ **-u**\ *MAX_STARTUPS* ] [ **-I**\ *INTERVAL* ] [
**-W**\ *CHECKPOINT* ] [ **-R**\ *CHECKPOINT* ] [
**-s**\ *MIN_CHUNK_SIZE* ] [ **-S**\ *MAX_CHUNK_SIZE* ] [
**-a**\ *NR_AHEAD* ] [ **-b**\ *BUF_SIZE* ] [ **-l**\ *LOGIN_NAME* ] [
**-P**\ *PORT* ] [ **-F**\ *CONFIG* ] [ **-i**\ *IDENTITY* ] [
**-c**\ *CIPHER* ] [ **-M**\ *HMAC* ] [ **-C**\ *COMPRESS* ] [
**-g**\ *CONGESTION* ] *source ... target*
**mscp** [**-46vqDpHdNh**] [ **-n** *NR_CONNECTIONS* ] [ **-m**
*COREMASK* ] [ **-u** *MAX_STARTUPS* ] [ **-I** *INTERVAL* ] [ **-W**
*CHECKPOINT* ] [ **-R** *CHECKPOINT* ] [ **-s** *MIN_CHUNK_SIZE* ] [
**-S** *MAX_CHUNK_SIZE* ] [ **-a** *NR_AHEAD* ] [ **-b** *BUF_SIZE* ] [
**-L** *LIMIT_BITRATE* ] [ **-l** *LOGIN_NAME* ] [ **-P** *PORT* ] [
**-F** *CONFIG* ] [ **-i** *IDENTITY* ] [ **-c** *CIPHER* ] [ **-M**
*HMAC* ] [ **-C** *COMPRESS* ] [ **-g** *CONGESTION* ] *source ...
target*

DESCRIPTION
===========
Expand Down Expand Up @@ -111,6 +111,10 @@ OPTIONS
delivered over SSH. Changing this value is not recommended at
present.

**-L LIMIT_BITRATE**
Limits the bitrate, specified with k (K), m (M), and g (G), e.g.,
100m indicates 100 Mbps.

**-4**
Uses IPv4 addresses only.

Expand Down
3 changes: 2 additions & 1 deletion include/mscp.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ struct mscp_opts {
size_t min_chunk_sz; /** minimum chunk size (default 64MB) */
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
size_t buf_sz; /** buffer size, default 16k. */
char *coremask; /** hex to specifiy usable cpu cores */
size_t bitrate; /** bits-per-seconds to limit bandwidth */
char *coremask; /** hex to specifiy usable cpu cores */
int max_startups; /** sshd MaxStartups concurrent connections */
int interval; /** interval between SSH connection attempts */
bool preserve_ts; /** preserve file timestamps */
Expand Down
94 changes: 94 additions & 0 deletions src/bwlimit.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* SPDX-License-Identifier: GPL-3.0-only */
#include <errno.h>

#include <bwlimit.h>
#include <platform.h>

#define timespeczerorize(ts) \
do { \
ts.tv_sec = 0; \
ts.tv_nsec = 0; \
} while (0)

int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win)
{
if (!(bw->sem = sem_create(1)))
return -1;

bw->bps = bps;
bw->win = win; /* msec window */
bw->amt = (double)bps / 8 / 1000 * win; /* bytes in a window (msec) */
bw->credit = bw->amt;
timespeczerorize(bw->wstart);
timespeczerorize(bw->wend);

return 0;
}

#define timespecisset(ts) ((ts).tv_sec || (ts).tv_nsec)

#define timespecmsadd(a, msec, r) \
do { \
(r).tv_sec = (a).tv_sec; \
(r).tv_nsec = (a).tv_nsec + (msec * 1000000); \
if ((r).tv_nsec > 1000000000) { \
(r).tv_sec += (r.tv_nsec) / 1000000000L; \
(r).tv_nsec = (r.tv_nsec) % 1000000000L; \
} \
} while (0)

#define timespecsub(a, b, r) \
do { \
(r).tv_sec = (a).tv_sec - (b).tv_sec; \
(r).tv_nsec = (a).tv_nsec - (b).tv_nsec; \
if ((r).tv_nsec < 0) { \
(r).tv_sec -= 1; \
(r).tv_nsec += 1000000000; \
} \
} while (0)

#define timespeccmp(a, b, expr) \
((a.tv_sec * 1000000000 + a.tv_nsec) expr(b.tv_sec * 1000000000 + b.tv_nsec))

#include <stdio.h>

int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes)
{
struct timespec now, end, rq, rm;

if (bw->bps == 0)
return 0; /* no bandwidth limit */

if (sem_wait(bw->sem) < 0)
return -1;

clock_gettime(CLOCK_MONOTONIC, &now);

if (!timespecisset(bw->wstart)) {
bw->wstart = now;
timespecmsadd(bw->wstart, bw->win, bw->wend);
}

bw->credit -= nr_bytes;

if (bw->credit < 0) {
/* no more credit on this window. sleep until the end
* of this windown and additional time for the
* remaining bytes. */
uint64_t addition = (double)(bw->credit * -1) / (bw->bps / 8);
timespecmsadd(bw->wend, addition * 1000, end);
if (timespeccmp(end, now, >)) {
timespecsub(end, now, rq);
while (nanosleep(&rq, &rm) == -1) {
if (errno != EINTR)
break;
rq = rm;
}
}
bw->credit = bw->amt;
timespeczerorize(bw->wstart);
}

sem_post(bw->sem);
return 0;
}
27 changes: 27 additions & 0 deletions src/bwlimit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* SPDX-License-Identifier: GPL-3.0-only */
#ifndef _BWLIMIT_H_
#define _BWLIMIT_H_

#include <stdbool.h>
#include <stdint.h>
#include <time.h>
#include <semaphore.h>

struct bwlimit {
sem_t *sem; /* semaphore */
uint64_t bps; /* limit bit-rate (bps) */
uint64_t win; /* window size (msec) */
size_t amt; /* amount of bytes can be sent in a window */

ssize_t credit; /* remaining bytes can be sent in a window */
struct timespec wstart, wend; /* window start time and end time */
};

int bwlimit_init(struct bwlimit *bw, uint64_t bps, uint64_t win);
/* if bps is 0, it means that bwlimit is not active. If so,
* bwlimit_wait() returns immediately. */

int bwlimit_wait(struct bwlimit *bw, size_t nr_bytes);


#endif /* _BWLIMIT_H_ */
27 changes: 25 additions & 2 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ void usage(bool print_help)
"\n"
"Usage: mscp [-46vqDpHdNh] [-n nr_conns] [-m coremask]\n"
" [-u max_startups] [-I interval] [-W checkpoint] [-R checkpoint]\n"
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n"
" [-b buf_sz] [-L limit_bitrate]\n"
" [-l login_name] [-P port] [-F ssh_config] [-i identity_file]\n"
" [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n"
" source ... target\n"
Expand All @@ -48,6 +49,7 @@ void usage(bool print_help)
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
" -a NR_AHEAD number of inflight SFTP commands (default: 32)\n"
" -b BUF_SZ buffer size for i/o and transfer\n"
" -L LIMIT_BITRATE Limit the bitrate, n[KMG] (default: 0)\n"
"\n"
" -4 use IPv4\n"
" -6 use IPv6\n"
Expand Down Expand Up @@ -266,12 +268,14 @@ int main(int argc, char **argv)
int direction = 0;
char *remote = NULL, *checkpoint_save = NULL, *checkpoint_load = NULL;
bool dryrun = false, resume = false;
char *u;
size_t mag = 0;

memset(&s, 0, sizeof(s));
memset(&o, 0, sizeof(o));
o.severity = MSCP_SEVERITY_WARN;

#define mscpopts "n:m:u:I:W:R:s:S:a:b:46vqDrl:P:i:F:c:M:C:g:pHdNh"
#define mscpopts "n:m:u:I:W:R:s:S:a:b:L:46vqDrl:P:i:F:c:M:C:g:pHdNh"
while ((ch = getopt(argc, argv, mscpopts)) != -1) {
switch (ch) {
case 'n':
Expand Down Expand Up @@ -309,6 +313,25 @@ int main(int argc, char **argv)
case 'b':
o.buf_sz = atoi(optarg);
break;
case 'L':
u = optarg + (strlen(optarg) - 1);
if (*u == 'k' || *u == 'K') {
mag = 1000;
*u = '\0';
} else if (*u == 'm' || *u == 'M') {
mag = 1000000;
*u = '\0';
} else if (*u == 'g' || *u == 'G') {
mag = 1000000000;
*u = '\0';
}
o.bitrate = atol(optarg);
if (o.bitrate == 0) {
pr_err("invalid bitrate: %s", optarg);
return 1;
}
o.bitrate *= mag;
break;
case '4':
s.ai_family = AF_INET;
break;
Expand Down
15 changes: 12 additions & 3 deletions src/mscp.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <print.h>
#include <strerrno.h>
#include <mscp.h>
#include <bwlimit.h>

#include <openbsd-compat/openbsd-compat.h>

Expand Down Expand Up @@ -56,6 +57,8 @@ struct mscp {
#define chunk_pool_is_ready(m) ((m)->chunk_pool_ready)
#define chunk_pool_set_ready(m, b) ((m)->chunk_pool_ready = b)

struct bwlimit bw; /* bandwidth limit mechanism */

struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
};

Expand Down Expand Up @@ -281,6 +284,12 @@ struct mscp *mscp_init(struct mscp_opts *o, struct mscp_ssh_opts *s)
pr_notice("usable cpu cores:%s", b);
}

if (bwlimit_init(&m->bw, o->bitrate, 100) < 0) { /* 50ms window (hardcoded) */
priv_set_errv("bwlimit_init: %s", strerrno());
goto free_out;
}
pr_notice("bitrate limit: %lu bps", o->bitrate);

return m;

free_out:
Expand Down Expand Up @@ -522,8 +531,8 @@ int mscp_checkpoint_load(struct mscp *m, const char *pathname)

int mscp_checkpoint_save(struct mscp *m, const char *pathname)
{
return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name,
m->remote, m->path_pool, m->chunk_pool);
return checkpoint_save(pathname, m->direction, m->ssh_opts->login_name, m->remote,
m->path_pool, m->chunk_pool);
}

static void *mscp_copy_thread(void *arg);
Expand Down Expand Up @@ -712,7 +721,7 @@ void *mscp_copy_thread(void *arg)
}

if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
m->opts->buf_sz, m->opts->preserve_ts,
m->opts->buf_sz, m->opts->preserve_ts, &m->bw,
&t->copied_bytes)) < 0)
break;
}
Expand Down
Loading

0 comments on commit 4caaaf2

Please sign in to comment.