Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Use the connection limit also for DVR, fixes #2485
  • Loading branch information
perexg committed May 20, 2015
1 parent 2169a80 commit 808fd47
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 27 deletions.
3 changes: 2 additions & 1 deletion docs/html/config_access.html
Expand Up @@ -84,7 +84,8 @@

<dt><b>Limit Connections</b>
<dd>
If set, this will limit the number of concurrent streaming connections a user is permitted to have. 0=disabled
If set, this will limit the number of concurrent streaming connections and
DVR sessions a user is permitted to have. 0=disabled

<dt><b>Video Recorder</b>
<dd>
Expand Down
30 changes: 28 additions & 2 deletions src/access.c
Expand Up @@ -486,7 +486,7 @@ access_get(const char *username, const char *password, struct sockaddr *src)
a->aa_representative = strdup(username);
} else {
a->aa_representative = malloc(50);
tcp_get_ip_str((struct sockaddr*)src, a->aa_representative, 50);
tcp_get_str_from_ip((struct sockaddr*)src, a->aa_representative, 50);
}

if (access_noacl) {
Expand Down Expand Up @@ -555,7 +555,7 @@ access_get_hashed(const char *username, const uint8_t digest[20],
a->aa_representative = strdup(username);
} else {
a->aa_representative = malloc(50);
tcp_get_ip_str((struct sockaddr*)src, a->aa_representative, 50);
tcp_get_str_from_ip((struct sockaddr*)src, a->aa_representative, 50);
}

if(access_noacl) {
Expand Down Expand Up @@ -617,7 +617,33 @@ access_get_hashed(const char *username, const uint8_t digest[20],
return a;
}

/**
*
*/
access_t *
access_get_by_username(const char *username)
{
access_t *a = calloc(1, sizeof(*a));
access_entry_t *ae;

if(access_noacl) {
a->aa_rights = ACCESS_FULL;
return a;
}

TAILQ_FOREACH(ae, &access_entries, ae_link) {

if(!ae->ae_enabled)
continue;

if(ae->ae_username[0] == '*' || strcmp(ae->ae_username, username))
continue;

access_update(a, ae);
}

return a;
}

/**
*
Expand Down
6 changes: 6 additions & 0 deletions src/access.h
Expand Up @@ -183,6 +183,12 @@ access_t *
access_get_hashed(const char *username, const uint8_t digest[20],
const uint8_t *challenge, struct sockaddr *src);

/**
*
*/
access_t *
access_get_by_username(const char *username);

/**
*
*/
Expand Down
2 changes: 2 additions & 0 deletions src/dvr/dvr.h
Expand Up @@ -393,6 +393,8 @@ void dvr_config_destroy_by_profile(profile_t *pro, int delconf);

void dvr_make_title(char *output, size_t outlen, dvr_entry_t *de);

uint32_t dvr_usage_count(access_t *aa);

static inline int dvr_entry_is_editable(dvr_entry_t *de)
{ return de->de_sched_state == DVR_SCHEDULED; }

Expand Down
22 changes: 21 additions & 1 deletion src/dvr/dvr_db.c
Expand Up @@ -341,6 +341,27 @@ dvr_make_title(char *output, size_t outlen, dvr_entry_t *de)
}
}

/**
*
*/
uint32_t
dvr_usage_count(access_t *aa)
{
dvr_entry_t *de;
uint32_t used = 0;

LIST_FOREACH(de, &dvrentries, de_global_link) {
if (de->de_owner && de->de_owner[0]) {
if (!strcmp(de->de_owner, aa->aa_username ?: ""))
used++;
} else if (!strcmp(de->de_creator ?: "", aa->aa_representative ?: "")) {
used++;
}
}

return used;
}

static void
dvr_entry_set_timer(dvr_entry_t *de)
{
Expand Down Expand Up @@ -498,7 +519,6 @@ dvr_entry_create(const char *uuid, htsmsg_t *conf)
if (de->de_channel) {
LIST_FOREACH(de2, &de->de_channel->ch_dvrs, de_channel_link)
if(de2 != de &&
de2->de_channel == de->de_channel &&
de2->de_config == de->de_config &&
de2->de_start == de->de_start &&
de2->de_sched_state != DVR_COMPLETED &&
Expand Down
26 changes: 26 additions & 0 deletions src/dvr/dvr_rec.c
Expand Up @@ -27,6 +27,7 @@

#include "tvheadend.h"
#include "streaming.h"
#include "tcp.h"
#include "dvr.h"
#include "spawn.h"
#include "service.h"
Expand Down Expand Up @@ -65,6 +66,9 @@ dvr_rec_subscribe(dvr_entry_t *de)
int weight;
profile_t *pro;
profile_chain_t *prch;
struct sockaddr sa;
access_t *aa;
uint32_t rec_count, net_count;

assert(de->de_s == NULL);
assert(de->de_chain == NULL);
Expand All @@ -76,6 +80,28 @@ dvr_rec_subscribe(dvr_entry_t *de)

snprintf(buf, sizeof(buf), "DVR: %s", lang_str_get(de->de_title, NULL));

if (de->de_owner && de->de_owner[0] != '\0')
aa = access_get_by_username(de->de_owner);
else if (de->de_creator && de->de_creator[0] != '\0' &&
tcp_get_ip_from_str(de->de_creator, &sa) != NULL)
aa = access_get_by_addr(&sa);
else {
tvherror("dvr", "unable to find access");
return -1;
}

if (aa->aa_conn_limit) {
rec_count = dvr_usage_count(aa);
net_count = tcp_connection_count(aa);
if (rec_count + net_count >= aa->aa_conn_limit) {
tvherror("dvr", "multiple connections are not allowed for user '%s' from '%s' "
"(limit %u, active streaming %u, active DVR %u)",
aa->aa_username ?: "", aa->aa_representative ?: "",
aa->aa_conn_limit, rec_count, net_count);
return -1;
}
}

pro = de->de_config->dvr_profile;
prch = malloc(sizeof(*prch));
profile_chain_init(prch, pro, de->de_channel);
Expand Down
4 changes: 2 additions & 2 deletions src/htsp_server.c
Expand Up @@ -577,7 +577,7 @@ htsp_build_channel(channel_t *ch, const char *method, htsp_connection_t *htsp)
char buf[50];
addrlen = sizeof(addr);
getsockname(htsp->htsp_fd, (struct sockaddr*)&addr, &addrlen);
tcp_get_ip_str((struct sockaddr*)&addr, buf, 50);
tcp_get_str_from_ip((struct sockaddr*)&addr, buf, 50);
snprintf(url, sizeof(url), "http://%s%s%s:%d%s/%s",
(addr.ss_family == AF_INET6)?"[":"",
buf,
Expand Down Expand Up @@ -2794,7 +2794,7 @@ htsp_serve(int fd, void **opaque, struct sockaddr_storage *source,

// Note: global_lock held on entry

tcp_get_ip_str((struct sockaddr*)source, buf, 50);
tcp_get_str_from_ip((struct sockaddr*)source, buf, 50);

memset(&htsp, 0, sizeof(htsp_connection_t));
*opaque = &htsp;
Expand Down
2 changes: 1 addition & 1 deletion src/http.c
Expand Up @@ -721,7 +721,7 @@ process_request(http_connection_t *hc, htsbuf_queue_t *spill)
char authbuf[150];

hc->hc_url_orig = tvh_strdupa(hc->hc_url);
tcp_get_ip_str((struct sockaddr*)hc->hc_peer, authbuf, sizeof(authbuf));
tcp_get_str_from_ip((struct sockaddr*)hc->hc_peer, authbuf, sizeof(authbuf));
hc->hc_peer_ipstr = tvh_strdupa(authbuf);
hc->hc_representative = hc->hc_peer_ipstr;
hc->hc_username = NULL;
Expand Down
4 changes: 2 additions & 2 deletions src/satip/rtp.c
Expand Up @@ -168,7 +168,7 @@ satip_rtp_thread(void *aux)
char peername[50];
int alive = 1, fatal = 0, r;

tcp_get_ip_str((struct sockaddr *)&rtp->peer, peername, sizeof(peername));
tcp_get_str_from_ip((struct sockaddr *)&rtp->peer, peername, sizeof(peername));
tvhdebug("satips", "RTP streaming to %s:%d open", peername, rtp->port);

pthread_mutex_lock(&sq->sq_mutex);
Expand Down Expand Up @@ -611,7 +611,7 @@ satip_rtcp_thread(void *aux)
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
if (r < 0) {
err = errno;
tcp_get_ip_str((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf));
tcp_get_str_from_ip((struct sockaddr*)&rtp->peer2, addrbuf, sizeof(addrbuf));
tvhwarn("satips", "RTCP send to error %s:%d : %s",
addrbuf, IP_PORT(rtp->peer2), strerror(err));
}
Expand Down
2 changes: 1 addition & 1 deletion src/satip/rtsp.c
Expand Up @@ -1443,7 +1443,7 @@ rtsp_serve(int fd, void **opaque, struct sockaddr_storage *peer,

memset(&aa, 0, sizeof(aa));
strcpy(buf, "SAT>IP Client ");
tcp_get_ip_str((struct sockaddr *)peer, buf + strlen(buf), sizeof(buf) - strlen(buf));
tcp_get_str_from_ip((struct sockaddr *)peer, buf + strlen(buf), sizeof(buf) - strlen(buf));
aa.aa_representative = buf;

tcp = tcp_connection_launch(fd, rtsp_stream_status, &aa);
Expand Down
8 changes: 4 additions & 4 deletions src/satip/server.c
Expand Up @@ -348,7 +348,7 @@ CONFIGID.UPNP.ORG: 0\r\n"
return;

#if ENABLE_TRACE
tcp_get_ip_str((struct sockaddr *)dst, buf, sizeof(buf));
tcp_get_str_from_ip((struct sockaddr *)dst, buf, sizeof(buf));
tvhtrace("satips", "sending discover reply to %s:%d%s%s",
buf, IP_PORT(*dst), deviceid ? " device: " : "", deviceid ?: "");
#endif
Expand Down Expand Up @@ -453,7 +453,7 @@ satips_upnp_discovery_received
return;

#if ENABLE_TRACE
tcp_get_ip_str((struct sockaddr *)storage, buf2, sizeof(buf2));
tcp_get_str_from_ip((struct sockaddr *)storage, buf2, sizeof(buf2));
tvhtrace("satips", "received %s M-SEARCH from %s:%d",
conn->multicast ? "multicast" : "unicast",
buf2, ntohs(IP_PORT(*storage)));
Expand All @@ -465,7 +465,7 @@ satips_upnp_discovery_received
satip_server_deviceid += 1;
if (satip_server_deviceid >= 254)
satip_server_deviceid = 1;
tcp_get_ip_str((struct sockaddr *)storage, buf2, sizeof(buf2));
tcp_get_str_from_ip((struct sockaddr *)storage, buf2, sizeof(buf2));
tvhwarn("satips", "received duplicate SAT>IP DeviceID %s from %s:%d, using %d",
deviceid, buf2, ntohs(IP_PORT(*storage)), satip_server_deviceid);
satips_upnp_send_discover_reply(storage, deviceid);
Expand Down Expand Up @@ -571,7 +571,7 @@ void satip_server_init(int rtsp_port)
tvherror("satips", "Unable to determine the HTTP/RTSP address");
return;
}
tcp_get_ip_str((const struct sockaddr *)&http, http_ip, sizeof(http_ip));
tcp_get_str_from_ip((const struct sockaddr *)&http, http_ip, sizeof(http_ip));
http_server_ip = strdup(http_ip);
http_server_port = ntohs(IP_PORT(http));

Expand Down
69 changes: 58 additions & 11 deletions src/tcp.c
Expand Up @@ -39,6 +39,7 @@
#include "tvhpoll.h"
#include "notify.h"
#include "access.h"
#include "dvr/dvr.h"

int tcp_preferred_address_family = AF_INET;
int tcp_server_running;
Expand Down Expand Up @@ -352,25 +353,49 @@ tcp_read_timeout(int fd, void *buf, size_t len, int timeout)
*
*/
char *
tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen)
tcp_get_str_from_ip(const struct sockaddr *sa, char *dst, size_t maxlen)
{
if(sa == NULL || s == NULL)
if (sa == NULL || dst == NULL)
return NULL;

switch(sa->sa_family)
{
case AF_INET:
inet_ntop(AF_INET, &(((struct sockaddr_in*)sa)->sin_addr), s, maxlen);
inet_ntop(AF_INET, &(((struct sockaddr_in*)sa)->sin_addr), dst, maxlen);
break;
case AF_INET6:
inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr), s, maxlen);
inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr), dst, maxlen);
break;
default:
strncpy(s, "Unknown AF", maxlen);
strncpy(dst, "Unknown AF", maxlen);
return NULL;
}

return s;
return dst;
}

/**
*
*/
struct sockaddr *
tcp_get_ip_from_str(const char *src, struct sockaddr *sa)
{
if (sa == NULL || src == NULL)
return NULL;

if (strstr(src, ":")) {
sa->sa_family = AF_INET6;
if (inet_pton(AF_INET6, src, &(((struct sockaddr_in6*)sa)->sin6_addr)) != 1)
return NULL;
} else if (strstr(src, ".")) {
sa->sa_family = AF_INET;
if (inet_pton(AF_INET, src, &(((struct sockaddr_in*)sa)->sin_addr)) != 1)
return NULL;
} else {
return NULL;
}

return sa;
}

/**
Expand Down Expand Up @@ -406,6 +431,26 @@ static LIST_HEAD(, tcp_server_launch) tcp_server_launches = { 0 };
static LIST_HEAD(, tcp_server_launch) tcp_server_active = { 0 };
static LIST_HEAD(, tcp_server_launch) tcp_server_join = { 0 };

/**
*
*/
uint32_t
tcp_connection_count(access_t *aa)
{
tcp_server_launch_t *tsl;
uint32_t used = 0;

lock_assert(&global_lock);

if (aa == NULL)
return 0;

LIST_FOREACH(tsl, &tcp_server_active, alink)
if (!strcmp(aa->aa_representative ?: "", tsl->representative ?: ""))
used++;
return used;
}

/**
*
*/
Expand All @@ -414,7 +459,7 @@ tcp_connection_launch
(int fd, void (*status) (void *opaque, htsmsg_t *m), access_t *aa)
{
tcp_server_launch_t *tsl, *res;
uint32_t used = 0;
uint32_t used = 0, used2;
time_t started = dispatch_clock;

lock_assert(&global_lock);
Expand All @@ -439,10 +484,12 @@ tcp_connection_launch
if (res == NULL)
return NULL;

if (aa->aa_conn_limit && used >= aa->aa_conn_limit) {
if (aa->aa_conn_limit && used + (used2 = dvr_usage_count(aa)) >= aa->aa_conn_limit) {
if (started + 3 < dispatch_clock) {
tvherror("tcp", "multiple connections are not allowed for user '%s' from '%s' (limit %u)",
aa->aa_username ?: "", aa->aa_representative ?: "", aa->aa_conn_limit);
tvherror("tcp", "multiple connections are not allowed for user '%s' from '%s' "
"(limit %u, active streaming %u, active DVR %u)",
aa->aa_username ?: "", aa->aa_representative ?: "", aa->aa_conn_limit,
used, used2);
return NULL;
}
pthread_mutex_unlock(&global_lock);
Expand Down Expand Up @@ -854,7 +901,7 @@ tcp_server_connections ( void )
if (!tsl->status) continue;
c++;
e = htsmsg_create_map();
tcp_get_ip_str((struct sockaddr*)&tsl->peer, buf, sizeof(buf));
tcp_get_str_from_ip((struct sockaddr*)&tsl->peer, buf, sizeof(buf));
htsmsg_add_u32(e, "id", tsl->id);
htsmsg_add_str(e, "peer", buf);
htsmsg_add_s64(e, "started", tsl->started);
Expand Down
5 changes: 4 additions & 1 deletion src/tcp.h
Expand Up @@ -85,10 +85,13 @@ int tcp_write_queue(int fd, htsbuf_queue_t *q);

int tcp_read_timeout(int fd, void *buf, size_t len, int timeout);

char *tcp_get_ip_str(const struct sockaddr *sa, char *s, size_t maxlen);
char *tcp_get_str_from_ip(const struct sockaddr *sa, char *dst, size_t maxlen);

struct sockaddr *tcp_get_ip_from_str(const char *str, struct sockaddr *sa);

struct access;

uint32_t tcp_connection_count(struct access *aa);
void *tcp_connection_launch(int fd, void (*status) (void *opaque, htsmsg_t *m),
struct access *aa);
void tcp_connection_land(void *tcp_id);
Expand Down

0 comments on commit 808fd47

Please sign in to comment.