Skip to content

Commit

Permalink
Merge pull request Upipe#11 from kierank/srt-new
Browse files Browse the repository at this point in the history
SRT fixes
  • Loading branch information
funman committed Mar 21, 2024
2 parents 8b67f66 + 9f55d31 commit fdfc52b
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 30 deletions.
9 changes: 8 additions & 1 deletion examples/rist_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static void usage(const char *argv0) {
fprintf(stdout, " -d: more verbose\n");
fprintf(stdout, " -q: more quiet\n");
fprintf(stdout, " -k encryption password\n");
fprintf(stdout, " -i stream_id\n");
fprintf(stdout, " -l key length in bits\n");
exit(EXIT_FAILURE);
}
Expand All @@ -95,6 +96,7 @@ static char *srcpath;
static char *dirpath;
static char *latency;
static char *password;
static char *stream_id;
static int key_length = 128;

static enum uprobe_log_level loglevel = UPROBE_LOG_DEBUG;
Expand Down Expand Up @@ -240,6 +242,8 @@ static int start(void)
if (!ubase_check(upipe_set_option(upipe_srt_handshake, "latency", latency)))
return EXIT_FAILURE;
upipe_srt_handshake_set_password(upipe_srt_handshake, password, key_length / 8);
if (stream_id)
upipe_set_option(upipe_srt_handshake, "stream_id", stream_id);

upipe_mgr_release(upipe_srt_handshake_mgr);

Expand Down Expand Up @@ -331,7 +335,7 @@ int main(int argc, char *argv[])
int opt;

/* parse options */
while ((opt = getopt(argc, argv, "qdk:l:")) != -1) {
while ((opt = getopt(argc, argv, "qdk:i:l:")) != -1) {
switch (opt) {
case 'q':
loglevel++;
Expand All @@ -343,6 +347,9 @@ int main(int argc, char *argv[])
case 'k':
password = optarg;
break;
case 'i':
stream_id = optarg;
break;
case 'l':
key_length = atoi(optarg);
break;
Expand Down
94 changes: 68 additions & 26 deletions lib/upipe-srt/upipe_srt_handshake.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct upipe_srt_handshake {
struct urefcount urefcount;

struct upump_mgr *upump_mgr;
struct upump *upump_timer; /* send handshakes every 250ms until connected */
struct upump *upump_handshake_send; /* send handshakes every 250ms until connected */
struct upump *upump_handshake_timeout; /* abort connection if not successful */
struct upump *upump_keepalive_timeout; /* reset connection if no keep alive in 10s */
struct upump *upump_kmreq; /* re-send key update if not acknowledged */
Expand Down Expand Up @@ -121,6 +121,7 @@ struct upipe_srt_handshake {
uint64_t establish_time;

bool expect_conclusion;
struct uref *caller_conclusion;

bool listener;

Expand All @@ -140,7 +141,7 @@ UPIPE_HELPER_VOID(upipe_srt_handshake)

UPIPE_HELPER_OUTPUT(upipe_srt_handshake, output, flow_def, output_state, request_list)
UPIPE_HELPER_UPUMP_MGR(upipe_srt_handshake, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_timer, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_handshake_send, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_handshake_timeout, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_keepalive_timeout, upump_mgr)
UPIPE_HELPER_UPUMP(upipe_srt_handshake, upump_kmreq, upump_mgr)
Expand Down Expand Up @@ -216,7 +217,7 @@ static void upipe_srt_handshake_shutdown(struct upipe *upipe)

uref_block_unmap(uref, 0);
upipe_srt_handshake_output(&upipe_srt_handshake->upipe, uref,
&upipe_srt_handshake->upump_timer);
&upipe_srt_handshake->upump_handshake_send);
}


Expand Down Expand Up @@ -315,33 +316,60 @@ static void upipe_srt_handshake_timeout(struct upump *upump)
upipe_srt_handshake->expect_conclusion = false;
}

static void upipe_srt_handshake_timer(struct upump *upump)
static void upipe_srt_handshake_send_timer(struct upump *upump)
{
struct upipe *upipe = upump_get_opaque(upump, struct upipe *);
struct upipe_srt_handshake *upipe_srt_handshake = upipe_srt_handshake_from_upipe(upipe);

uint64_t now = uclock_now(upipe_srt_handshake->uclock);
if (!upipe_srt_handshake->establish_time)
upipe_srt_handshake->establish_time = now;
uint32_t timestamp = (now - upipe_srt_handshake->establish_time) / 27;

/* 250 ms between handshakes, just like libsrt */
if (now - upipe_srt_handshake->last_hs_sent < UCLOCK_FREQ / 4)
return;

//send HS
uint8_t *out_cif;
struct uref *uref = upipe_srt_handshake_alloc_hs(upipe, 0, 0, &out_cif);
if (!uref)
return;
if (upipe_srt_handshake->expect_conclusion) {
if (upipe_srt_handshake->caller_conclusion) {
struct uref *uref = uref_dup(upipe_srt_handshake->caller_conclusion);
/* copy because we need to rewrite the timestamp */
struct ubuf *ubuf = ubuf_block_copy(uref->ubuf->mgr, uref->ubuf, 0, -1);
if (!ubuf) {
upipe_err_va(upipe, "Malloc failed");
return;
}
uref_attach_ubuf(uref, ubuf);

upipe_srt_handshake->establish_time = now;
uint8_t *out;
int output_size = -1;
if (unlikely(!ubase_check(uref_block_write(uref, 0, &output_size, &out)))) {
uref_free(uref);
upipe_throw_fatal(upipe, UBASE_ERR_ALLOC);
return;
}
srt_set_packet_timestamp(out, timestamp);
uref_block_unmap(uref, 0);

srt_set_handshake_version(out_cif, SRT_HANDSHAKE_VERSION_MIN); // XXX
srt_set_handshake_extension(out_cif, SRT_HANDSHAKE_EXT_KMREQ); // draft-sharabayko-srt-01#section-4.3.1.1 * Extension Field: 2
srt_set_handshake_type(out_cif, SRT_HANDSHAKE_TYPE_INDUCTION);
upipe_srt_handshake_output(&upipe_srt_handshake->upipe, uref,
&upipe_srt_handshake->upump_handshake_send);
}
} else {
//send HS
uint8_t *out_cif;
struct uref *uref = upipe_srt_handshake_alloc_hs(upipe, 0, timestamp, &out_cif);
if (!uref)
return;

uref_block_unmap(uref, 0);
srt_set_handshake_version(out_cif, SRT_HANDSHAKE_VERSION_MIN); // XXX
srt_set_handshake_extension(out_cif, SRT_HANDSHAKE_EXT_KMREQ); // draft-sharabayko-srt-01#section-4.3.1.1 * Extension Field: 2
srt_set_handshake_type(out_cif, SRT_HANDSHAKE_TYPE_INDUCTION);

upipe_srt_handshake_output(&upipe_srt_handshake->upipe, uref,
&upipe_srt_handshake->upump_timer);
uref_block_unmap(uref, 0);

upipe_srt_handshake_output(&upipe_srt_handshake->upipe, uref,
&upipe_srt_handshake->upump_handshake_send);
}
upipe_srt_handshake->last_hs_sent = now;
}

Expand Down Expand Up @@ -380,7 +408,7 @@ static struct upipe *upipe_srt_handshake_alloc(struct upipe_mgr *mgr,
upipe_srt_handshake_init_output(upipe);

upipe_srt_handshake_init_upump_mgr(upipe);
upipe_srt_handshake_init_upump_timer(upipe);
upipe_srt_handshake_init_upump_handshake_send(upipe);
upipe_srt_handshake_init_upump_handshake_timeout(upipe);
upipe_srt_handshake_init_upump_keepalive_timeout(upipe);
upipe_srt_handshake_init_upump_kmreq(upipe);
Expand All @@ -397,6 +425,7 @@ static struct upipe *upipe_srt_handshake_alloc(struct upipe_mgr *mgr,
upipe_srt_handshake->last_hs_sent = 0;

upipe_srt_handshake->expect_conclusion = false;
upipe_srt_handshake->caller_conclusion = NULL;

upipe_srt_handshake->stream_id = NULL;
upipe_srt_handshake->stream_id_len = 0;
Expand All @@ -418,6 +447,8 @@ static struct upipe *upipe_srt_handshake_alloc(struct upipe_mgr *mgr,
upipe_srt_handshake->kmreq = NULL;
upipe_srt_handshake->password = NULL;

upipe_srt_handshake->establish_time = 0;

upipe_throw_ready(upipe);
return upipe;
}
Expand Down Expand Up @@ -455,16 +486,16 @@ static int upipe_srt_handshake_check(struct upipe *upipe, struct uref *flow_form
return UBASE_ERR_NONE;
}

if (upipe_srt_handshake->upump_mgr && !upipe_srt_handshake->upump_keepalive_timeout && !upipe_srt_handshake->upump_timer && !upipe_srt_handshake->listener) {
if (upipe_srt_handshake->upump_mgr && !upipe_srt_handshake->upump_keepalive_timeout && !upipe_srt_handshake->upump_handshake_send && !upipe_srt_handshake->listener) {
upipe_srt_handshake->socket_id = mrand48();
upipe_srt_handshake->syn_cookie = 0;
struct upump *upump =
upump_alloc_timer(upipe_srt_handshake->upump_mgr,
upipe_srt_handshake_timer,
upipe_srt_handshake_send_timer,
upipe, upipe->refcount,
UCLOCK_FREQ/300, UCLOCK_FREQ/300);
upump_start(upump);
upipe_srt_handshake_set_upump_timer(upipe, upump);
upipe_srt_handshake_set_upump_handshake_send(upipe, upump);
}

return UBASE_ERR_NONE;
Expand Down Expand Up @@ -568,7 +599,7 @@ static int _upipe_srt_handshake_control(struct upipe *upipe,

switch (command) {
case UPIPE_ATTACH_UPUMP_MGR:
upipe_srt_handshake_set_upump_timer(upipe, NULL);
upipe_srt_handshake_set_upump_handshake_send(upipe, NULL);
upipe_srt_handshake_set_upump_handshake_timeout(upipe, NULL);
upipe_srt_handshake_set_upump_keepalive_timeout(upipe, NULL);
upipe_srt_handshake_set_upump_kmreq(upipe, NULL);
Expand Down Expand Up @@ -1049,7 +1080,7 @@ static struct uref *upipe_srt_handshake_handle_hs_caller_conclusion(struct upipe
{
struct upipe_srt_handshake *upipe_srt_handshake = upipe_srt_handshake_from_upipe(upipe);

upipe_srt_handshake_set_upump_timer(upipe, NULL);
upipe_srt_handshake_set_upump_handshake_send(upipe, NULL);
upipe_srt_handshake->remote_socket_id = hs_packet->remote_socket_id;

/* At least HSREQ is expected */
Expand Down Expand Up @@ -1332,10 +1363,10 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
bool conclusion = upipe_srt_handshake->expect_conclusion;

if (conclusion) {
/* Don't send a rejection as it could be a duplicate Induction on a long latency link */
if (hs_type != SRT_HANDSHAKE_TYPE_CONCLUSION) {
upipe_err_va(upipe, "Expected conclusion, ignore hs type 0x%x", hs_type);
return upipe_srt_handshake_alloc_hs_reject(upipe, timestamp,
hs_packet.remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
upipe_dbg_va(upipe, "Expected conclusion, ignore hs type 0x%x", hs_type);
return NULL;
}
} else {
if (hs_type != SRT_HANDSHAKE_TYPE_INDUCTION) {
Expand All @@ -1360,6 +1391,11 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
if (!upipe_srt_handshake->listener) {
if (conclusion) {
uref = upipe_srt_handshake_handle_hs_caller_conclusion(upipe, size, timestamp, &hs_packet);
/* We don't need the cached conclusion packet any more */
if (upipe_srt_handshake->caller_conclusion) {
uref_free(upipe_srt_handshake->caller_conclusion);
upipe_srt_handshake->caller_conclusion = NULL;
}
} else {
if (hs_packet.version != SRT_HANDSHAKE_VERSION || hs_packet.dst_socket_id != upipe_srt_handshake->socket_id) {
upipe_err_va(upipe, "Malformed handshake (%08x != %08x)",
Expand All @@ -1368,6 +1404,9 @@ static struct uref *upipe_srt_handshake_handle_hs(struct upipe *upipe, const uin
hs_packet.remote_socket_id, SRT_HANDSHAKE_TYPE_REJ_UNKNOWN);
}
uref = upipe_srt_handshake_handle_hs_caller_induction(upipe, size, timestamp, &hs_packet);
if (upipe_srt_handshake->caller_conclusion)
uref_free(upipe_srt_handshake->caller_conclusion);
upipe_srt_handshake->caller_conclusion = uref_dup(uref);
}
} else { /* listener */
if (conclusion) {
Expand Down Expand Up @@ -1645,8 +1684,11 @@ static void upipe_srt_handshake_free(struct upipe *upipe)
if (upipe_srt_handshake->kmreq)
uref_free(upipe_srt_handshake->kmreq);

if (upipe_srt_handshake->caller_conclusion)
uref_free(upipe_srt_handshake->caller_conclusion);

upipe_srt_handshake_clean_output(upipe);
upipe_srt_handshake_clean_upump_timer(upipe);
upipe_srt_handshake_clean_upump_handshake_send(upipe);
upipe_srt_handshake_clean_upump_handshake_timeout(upipe);
upipe_srt_handshake_clean_upump_keepalive_timeout(upipe);
upipe_srt_handshake_clean_upump_kmreq(upipe);
Expand Down
1 change: 1 addition & 0 deletions lib/upipe-srt/upipe_srt_receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,7 @@ static void upipe_srt_receiver_input(struct upipe *upipe, struct uref *uref,
// XXX : when much too late, it could mean RTP source restart
upipe_err_va(upipe, "LATE packet %u, dropped (buffered %"PRIu64" -> %"PRIu64")",
seqnum, first_seq, last_seq);
uref_free(uref);
}

/** @This frees a upipe.
Expand Down
6 changes: 3 additions & 3 deletions lib/upipe-srt/upipe_srt_sender.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,9 @@ static void upipe_srt_sender_input_sub(struct upipe *upipe, struct uref *uref,
uref_block_unmap(uref, 0);
uref_free(uref);
} else {
if (type == SRT_CONTROL_TYPE_HANDSHAKE) {
if (type == SRT_CONTROL_TYPE_HANDSHAKE && upipe_srt_sender->establish_time == 0 && now > 0) {
uint64_t ts = srt_get_packet_timestamp(buf);
if (ts)
upipe_srt_sender->establish_time = now - ts * UCLOCK_FREQ / 1000000;
upipe_srt_sender->establish_time = now - ts * UCLOCK_FREQ / 1000000;
}

uref_block_unmap(uref, 0);
Expand Down Expand Up @@ -595,6 +594,7 @@ static struct upipe *upipe_srt_sender_alloc(struct upipe_mgr *mgr,
upipe_srt_sender->latency = UCLOCK_FREQ; /* 1 sec */
upipe_srt_sender->socket_id = 0;
upipe_srt_sender->seqnum = 0;
upipe_srt_sender->establish_time = 0;
upipe_srt_sender->syn_cookie = 1;

upipe_srt_sender->sek_len[0] = 0;
Expand Down

0 comments on commit fdfc52b

Please sign in to comment.