Skip to content
Newer
Older
100644 598 lines (490 sloc) 15.5 KB
be3f569 @rick Import of the 1.3-RC3 release from litestream.org CVS to github.
authored
1 /*
2 * Copyright (C) 2000 Gene Kan. All rights reserved. genehkan@xcf.berkeley.edu
3 * Copyright (C) 2002 Litestream.org:
4 * Rick Bradley (rick@rickbradley.com),
5 * Sean Jewett (sean@rimboy.com),
6 * Brandon D. Valentine (brandon@dvalentine.com).
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 *
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 * 3. The names Gene Kan, Litestream.org, Rick Bradley, Sean Jewett, or
20 * Brandon D. Valentine may not be used to endorse products derived
21 * from this software or portions of this software without specific
22 * prior written permission.
23 *
24 * THIS SOFTWARE IS PROVIDED BY GENE KAN, LITESTREAM.ORG, RICK BRADLEY,
25 * SEAN JEWETT, AND BRANDON D. VALENTINE ``AS IS'' AND ANY EXPRESS OR
26 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
27 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE PROVIDERS BE LIABLE FOR ANY
29 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
30 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
31 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
33 * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
34 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE OR
35 * PORTIONS OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
36 * DAMAGE.
37 *
38 * $Id: restream.c,v 1.6 2005/09/06 21:26:02 roundeye Exp $
39 */
40
41 #include "stream_config.h"
42
43 #include <sys/time.h>
44 #include <sys/types.h>
45 #include <sys/socket.h>
46 #include <sys/resource.h>
47 #include <sys/param.h>
48
49 #include <netinet/in.h>
50 #include <arpa/inet.h>
51
52 #include <unistd.h>
53 #include <stdlib.h>
54 #include <stdio.h>
55 #include <string.h>
56 #include <errno.h>
57 #include <signal.h>
58 #include <time.h>
59 #include <fcntl.h>
60
61 #include "stream_sched.h"
62 #include "stream_inet.h"
63 #include "stream_std.h"
64
65 #include "icy.h"
66 #include "yp.h"
67
68 int g_time_to_die = 0;
69
70 time_t g_start_time;
71 int g_tick = 1;
72
73 unsigned char g_source_buf[131072];
74
75 icy_info g_icy_info;
76
77 char* g_mirrortag;
78
79 char* g_srchost;
80 int g_srcport;
81 char* g_strhost;
82 int g_strport;
83
84 int g_maxlisteners;
85
86 char* g_yp_host = NULL;
87 int g_yp_port;
88 icy_response g_yp_info;
89
90 stream_cli* g_source = NULL;
91
92 typedef struct _source_privdata {
93 int state;
94 #define SRC_SNDPROLOGUE 0
95 #define SRC_RCVPROLOGUEACK 20
96 #define SRC_STREAMING 40
97
98 char rbuf[MAX_MSGLEN];
99 char wbuf[MAX_MSGLEN];
100 } source_privdata;
101
102 typedef struct _server_stats {
103 unsigned long long int total_stream_received;
104 unsigned long long int total_stream_sent;
105 unsigned long long int overruns;
106 unsigned long long int overrun_bytes;
107
108 int peak_servers;
109 int total_servers;
110 } server_stats;
111
112 server_stats stats;
113
114 typedef struct _server_privdata {
115 int state;
116 #define STR_RCVREQUEST 0
117 #define STR_SNDHEADER 1
118 #define STR_SNDREPLY 2
119 #define STR_STREAMING 3
120
121 char rbuf[MAX_MSGLEN];
122 char wbuf[MAX_MSGLEN];
123 } server_privdata;
124
125 stream_serv* strservers[FD_SETSIZE];
126 int nstrservers = 0;
127
128 void clear_strtable (stream_serv* _server) {
129 int i;
130
131 for (i = 0; i < nstrservers; i++) {
132 if (strservers[i] == _server)
133 break;
134 }
135 if (i == nstrservers) {
136 stream_msg("failed to find");
137 return;
138 }
139
140 nstrservers--;
141
142 memmove(&strservers[i],
143 &strservers[i + 1],
144 sizeof(strservers[0]) * (nstrservers - i));
145 return;
146 }
147
148 void stream_cb
149 (stream_sched* _sched, int _s, short _events,
150 char* _rbuf, int _rbytes,
151 char* _wbuf, int _wbytes,
152 void* _privdata) {
153 stream_serv* server = (stream_serv*) _privdata;
154 server_privdata* privdata = (server_privdata*) server->privdata;
155 char message[MAX_MSGLEN];
156
157 extern int fd_setsize; /* from stream_sched.c */
158
159
160 if (_events & STREAMHUP) {
161 sprintf(message, "client %s disconnected",
162 in2ascii(server->addr));
163 stream_msg(message);
164
165 if (privdata->state == STR_STREAMING)
166 clear_strtable(server);
167 stream_serv_close(_sched, server);
168 free(privdata);
169 return;
170 }
171
172 switch (privdata->state) {
173 case STR_RCVREQUEST:
174 _rbuf[_rbytes] = '\0';
175
176 if (!strstr(_rbuf, "\r\n\r\n")) {
177 if (_rbytes == sizeof(privdata->rbuf)) {
178 sprintf(message, "client %s invalid request",
179 in2ascii(server->addr));
180 stream_msg(message);
181
182 stream_serv_close(_sched, server);
183 free(privdata);
184 return;
185 }
186
187 break;
188 }
189
190 if (strstr(_rbuf, "play.pls")) {
191 privdata->state = STR_SNDREPLY;
192
193 sprintf(message,
194 "[playlist]\n"
195 "numberofentries=1\n"
196 "File1=http://%s:%d\r\n\r\n",
197 g_strhost, g_strport);
198
199 sprintf(privdata->wbuf,
200 "HTTP/1.0 200 OK\r\n"
201 "Content-Length: %d\r\n"
202 "Content-Type: audio/x-scpls\r\n\r\n"
203 "\r\n"
204 "%s",
205 strlen(message), message);
206
207 stream_msg("STR_RCVREQUEST request pls");
208
209 }
210 else if (strstr(_rbuf, "stats.html")){
211
212 struct rusage usage;
213 struct rlimit lim;
214 time_t diff_time, h, m, s;
215
216 privdata->state = STR_SNDREPLY;
217
218 getrusage(RUSAGE_SELF, &usage);
219 getrlimit(RLIMIT_NOFILE, &lim);
220 diff_time = time(NULL) - g_start_time;
221 h = diff_time / 3600;
222 m = (diff_time - (h * 3600)) / 60;
223 s = diff_time - (h * 3600) - (m * 60);
224
225 sprintf(message,
226 "<html><head>\r\n"
227 "<title>Litestream statistics: %s</title>\r\n"
228 "</head>\r\n"
229 "<body>\r\n"
230 "<h1><a href=\"%s\">%s</a> (%s) %s Kb</h1>\r\n"
231 "<h3>%s mirror of <a href=\"http://%s:%d\">http://%s:%d</a></h3>\r\n"
232 "<center><table width=\"50%%\">\r\n"
233 "<tr><td>Source</td><td>%s</td></tr>\r\n"
234 "<tr><td>Uptime</td><td align=\"right\">%02ld:%02ld:%02ld</td></tr>\r\n"
235 "<tr><td>Max fd</td><td align=\"right\">%d</td></tr>\r\n"
236 "<tr><td>Current/peak/total/max clients</td><td align=\"right\">%d/%d/%d/%ld</td></tr>\r\n"
237 "<tr><td>I/O bytes</td><td align=\"right\">%qu/%qu</td></tr>\r\n"
238 "<tr><td>I/O messages</td><td align=\"right\">%ld/%ld</td></tr>\r\n"
239 "<tr><td>Overrun packets/bytes</td><td align=\"right\">%qu/%qu</td></tr>\r\n"
240 "<tr><td>User time</td><td align=\"right\">%ld sec %ld usec</td></tr>\r\n"
241 "<tr><td>System time</td><td align=\"right\">%ld sec %ld usec</td></tr>\r\n"
242 "<tr><td>Max RSS</td><td align=\"right\">%ld</td></tr>\r\n"
243 "<tr><td>I/O blocks</td><td align=\"right\">%ld/%ld</td></tr>\r\n"
244 "<tr><td>Signals</td><td align=\"right\">%ld</td></tr>\r\n"
245 "<tr><td>V/I context switches</td><td align=\"right\">%ld/%ld</td></tr>\r\n"
246 "</table></center>\r\n"
247 "</body></html>\r\n",
248 g_icy_info.name,
249 g_icy_info.url,
250 g_icy_info.name,
251 g_icy_info.genre, g_icy_info.br,
252 g_mirrortag, g_srchost, g_srcport, g_srchost, g_srcport,
253 g_source ? "Connected" : "Not connected",
254 h, m, s,
255 _sched->maxfd,
256 nstrservers, stats.peak_servers, stats.total_servers, (long) MIN(g_maxlisteners, MIN(fd_setsize, lim.rlim_cur)),
257 stats.total_stream_received, stats.total_stream_sent,
258 usage.ru_msgrcv, usage.ru_msgsnd,
259 stats.overruns, stats.overrun_bytes,
260 usage.ru_utime.tv_sec, usage.ru_utime.tv_usec,
261 usage.ru_stime.tv_sec, usage.ru_stime.tv_usec,
262 usage.ru_maxrss,
263 usage.ru_inblock, usage.ru_oublock,
264 usage.ru_nsignals,
265 usage.ru_nvcsw, usage.ru_nivcsw);
266
267 sprintf(privdata->wbuf,
268 "HTTP/1.0 200 OK\r\n"
269 "Content-Length: %d\r\n"
270 "Content-Type: text/html\r\n"
271 "\r\n"
272 "%s",
273 strlen(message), message);
274
275 stream_msg("STR_RCVREQUEST request stats");
276 }else {
277
278 privdata->state = STR_SNDHEADER;
279
280 if (g_source)
281 sprintf(privdata->wbuf,
282 "HTTP/1.0 200 OK\r\n"
283 "Server: " VERSION "\r\n"
284 "icy-name:%s\r\n"
285 "icy-genre:%s\r\n"
286 "icy-url:%s\r\n"
287 "icy-irc:%s\r\n"
288 "icy-icq:%s\r\n"
289 "icy-aim:%s\r\n"
290 "icy-pub:%d\r\n"
291 "icy-br:%s\r\n"
292 "\r\n",
293 g_icy_info.name,
294 g_icy_info.genre,
295 g_icy_info.url,
296 g_icy_info.irc,
297 g_icy_info.icq,
298 g_icy_info.aim,
299 g_icy_info.pub,
300 g_icy_info.br);
301 else
302 strcpy(privdata->wbuf,
303 "HTTP/1.0 200 OK\r\n"
304 "Server: " VERSION "\r\n"
305 "\r\n");
306
307 stream_msg("STR_RCVREQUEST request stream");
308 }
309
310
311 stream_sched_post_write(_sched, server->s, privdata->wbuf, strlen(privdata->wbuf));
312 break;
313 case STR_SNDREPLY:
314 stream_msg("STR_SNDREPLY done");
315 stream_serv_close(_sched, server);
316 free(privdata);
317 break;
318 case STR_SNDHEADER:
319 if (nstrservers == g_maxlisteners) {
320 stream_msg("STR_SNDHEADER max listeners reached");
321 stream_serv_close(_sched, server);
322 free(privdata);
323 return;
324 }
325 privdata->state = STR_STREAMING;
326
327 strservers[nstrservers++] = server;
328
329 stats.total_servers++;
330 if (nstrservers > stats.peak_servers)
331 stats.peak_servers = nstrservers;
332
333 stream_sched_post_read(_sched, server->s, privdata->rbuf, sizeof(privdata->rbuf));
334 break;
335 case STR_STREAMING:
336 stream_sched_post_read(_sched, server->s, privdata->rbuf, sizeof(privdata->rbuf));
337 break;
338 }
339 return;
340 }
341
342 void stream_listen_cb
343 (stream_sched* _sched, int _s, short _events,
344 char* _rbuf, int _rbytes,
345 char* _wbuf, int _wbytes,
346 void* _privdata) {
347 stream_serv* server;
348 server_privdata* privdata;
349 char message[MAX_MSGLEN];
350
351
352 privdata = (server_privdata*) malloc(sizeof(server_privdata));
353 privdata->state = STR_RCVREQUEST;
354
355 server = stream_serv_accept(_sched,
356 _s,
357 stream_cb,
358 privdata);
359 if (!server) {
360 stream_err("dead connect");
361 free(privdata);
362 return;
363 }
364
365 sprintf(message, "client %s connected",
366 in2ascii(server->addr));
367 stream_msg(message);
368
369 stream_sched_post_read(_sched, server->s, privdata->rbuf, sizeof(privdata->rbuf));
370 return;
371 }
372
373 void source_cb
374 (stream_sched* _sched, int _s, short _events,
375 char* _rbuf, int _rbytes,
376 char* _wbuf, int _wbytes,
377 void* _privdata) {
378 stream_cli* source = (stream_cli*) _privdata;
379 source_privdata* privdata = (source_privdata*) source->privdata;
380 int i;
381 int bytes;
382
383 struct sockaddr_in peer;
384 int peer_len;
385 int flags;
386
387
388 if (_events & STREAMHUP) {
389 stream_msg("source disconnected");
390 stream_cli_close(_sched, source);
391 free(privdata);
392 g_source = NULL;
393 return;
394 }
395
396 switch (privdata->state) {
397 case SRC_SNDPROLOGUE:
398 privdata->state = SRC_RCVPROLOGUEACK;
399 stream_sched_post_read(_sched, source->s, privdata->rbuf, sizeof(privdata->rbuf));
400 break;
401 case SRC_RCVPROLOGUEACK:
402 _rbuf[_rbytes] = '\0';
403
404 /* omaha */
405 if (!strstr(_rbuf, "\r\n\r\n")
406 && !strstr(_rbuf, "\n\n")) {
407 if (_rbytes == sizeof(privdata->rbuf)) {
408 stream_msg("invalid prologue ack");
409
410 stream_cli_close(_sched, source);
411 free(privdata);
412 g_source = NULL;
413 return;
414 }
415
416 break;
417 }
418
419 parse_icy_info(_rbuf, _rbytes, &g_icy_info);
420
421 strcat(g_icy_info.name, " (");
422 strcat(g_icy_info.name, g_mirrortag);
423 strcat(g_icy_info.name, " mirror)");
424
425 if (g_yp_host && g_icy_info.pub)
426 yp_create(_sched, g_yp_host, g_yp_port, &g_icy_info);
427
428 stream_msg("source stream begin");
429 privdata->state = SRC_STREAMING;
430 stream_sched_post_read(_sched, source->s, g_source_buf, sizeof(g_source_buf));
431 break;
432 case SRC_STREAMING:
433 stats.total_stream_received += _rbytes;
434
435 for (i = 0; i < nstrservers; i++) {
436 if (((server_privdata*) strservers[i]->privdata)->state == STR_STREAMING) {
437 /* get peer address information */
438 peer_len = sizeof(peer);
439 if(-1 != getpeername(strservers[i]->s, (struct sockaddr *)&peer, &peer_len)) {
440 }
441
442 /* there is a problem with fcntl(O_NONBLOCK) on Linux
443 * for whatever reason the setting doesn't stick. On
444 * Linux servers we have to re-set the O_NONBLOCK bit
445 * or else we will block for all clients if one
446 * client pulls the plug. Linux is for bitches.
447 */
448 if ((flags = fcntl(strservers[i]->s, F_GETFL, 0)) < 0) {
449 stream_err("fcntl F_GETFL failed");
450 } else {
451 if (!(flags & O_NONBLOCK)) {
452 flags |= O_NONBLOCK;
453 if (fcntl(strservers[i]->s, F_SETFL, flags) < 0) {
454 }
455 }
456 }
457
458 bytes = write(strservers[i]->s, _rbuf, _rbytes);
459 if (bytes == -1) {
460 if (errno == EAGAIN) {
461 stats.overruns++;
462 stats.overrun_bytes += _rbytes;
463 }
464 /* An interesting problem. If we close the stream here, it might still
465 be active on the read side, which could cause a double-close/double-free.
466 And I didn't even free the privdata here to begin with!
467 */
468 /* else {
469 stream_err("SRC_STREAMING stream write");
470 clear_strtable(strservers[i]);
471 stream_serv_close(_sched, strservers[i]);
472 }*/
473 continue;
474 } else {
475 }
476 stats.total_stream_sent += bytes;
477 }
478 }
479
480 stream_sched_post_read(_sched, source->s, g_source_buf, sizeof(g_source_buf));
481 break;
482 }
483 return;
484 }
485
486 stream_cli* new_source
487 (stream_sched* _sched, char* _source_ip, int _port) {
488 stream_cli* source;
489 source_privdata* privdata;
490 static char message[] = "GET /\r\nUser-Agent: xmms\r\n\r\n";
491
492
493 privdata = (source_privdata*) malloc(sizeof(source_privdata));
494 privdata->state = SRC_SNDPROLOGUE;
495
496 source = stream_cli_new(_sched, _source_ip, _port, privdata, source_cb);
497 if (!source) {
498 stream_err("");
499 goto bad;
500 }
501
502 stream_sched_post_write(_sched, source->s, message, strlen(message));
503
504 return(source);
505
506 bad:
507 free(privdata);
508 return(NULL);
509 }
510
511 void sigint_handler (int x) {
512 g_time_to_die = 1;
513 }
514
515 void sigalrm_handler (int x) {
516 g_tick = 1;
517 }
518
519 int main (int argc, char* argv[]) {
520 stream_sched* sched;
521 stream_serv* stream;
522
523 time_t time_now, last_touch = 0, last_attempt = 0;
524 struct itimerval itimer = { { 10, 0 }, { 10, 0 } }; /* Every 10 seconds. */
525
526 char* log_ident;
527
528
529 if (argc < 8 || argc > 10) {
530 fprintf(stderr, "Usage: %s <mirror tag> <source host> <source port> <stream host> <stream port> <max listeners> <log ident> [<yp host> <yp port>]\n", argv[0]);
531 return(-1);
532 }
533
534 g_mirrortag = argv[1];
535 g_srchost = argv[2];
536 g_srcport = atoi(argv[3]);
537 g_strhost = argv[4];
538 g_strport = atoi(argv[5]);
539 g_maxlisteners = atoi(argv[6]);
540 log_ident = argv[7];
541
542 if (argc == 10) {
543 g_yp_host = argv[8];
544 g_yp_port = atoi(argv[9]);
545 }
546
547 signal(SIGPIPE, SIG_IGN);
548 signal(SIGTRAP, SIG_IGN);
549 signal(SIGINT, sigint_handler);
550 signal(SIGALRM, sigalrm_handler);
551
552 setitimer(ITIMER_REAL, &itimer, NULL);
553
554 stream_openlog(log_ident);
555 stream_msg("server started");
556
557 g_start_time = time(NULL);
558
559 sched = stream_sched_new();
560
561 stream = stream_serv_new(sched, g_strport, 100, NULL, stream_listen_cb);
562 if (!stream) {
563 stream_err("stream create");
564 return(1);
565 }
566
567 while (!g_time_to_die) {
568 if (g_tick) {
569
570 g_tick = 0;
571 time_now = time(NULL);
572
573 if (!g_source && time_now - last_attempt > 30) {
574 last_attempt = time_now;
575 g_source = new_source (sched, g_srchost, g_srcport);
576 }
577
578 if (g_source && strlen(g_yp_info.id) &&
579 (time_now - last_touch) > (g_yp_info.tchfrq * 60)) {
580
581 char args[MAX_MSGLEN];
582
583 last_touch = time_now;
584
585 sprintf(args,
586 "id=%s&p=%d&li=%d&alt=0",
587 g_yp_info.id, g_strport, nstrservers);
588 yp_touch(sched, g_yp_host, g_yp_port, args);
589 }
590 }
591 stream_sched_run(sched, 10000);
592 }
593
594 stream_msg("server exited normally");
595 stream_closelog();
596 exit(0);
597 }
Something went wrong with that request. Please try again.