Skip to content
Browse files

initial import. mostly works

  • Loading branch information...
0 parents commit 5bc43c24ca70b7706ed36ec1718ad8de1aa28cae @revmischa committed Jul 25, 2010
5 MANIFEST
@@ -0,0 +1,5 @@
+Makefile.PL
+MANIFEST
+README
+t/RTSP-Server.t
+lib/RTSP/Server.pm
18 Makefile.PL
@@ -0,0 +1,18 @@
+use 5.010000;
+use ExtUtils::MakeMaker;
+
+WriteMakefile(
+ NAME => 'RTSP::Server',
+ VERSION_FROM => 'lib/RTSP/Server.pm',
+ PREREQ_PM => {
+ 'Moose' => 0,
+ 'namespace::autoclean' => 0,
+ 'AnyEvent::Handle' => 0,
+ 'AnyEvent::Socket' => 0,
+ 'AnyEvent::Util' => 0,
+ 'Socket' => 0,
+ },
+ ($] >= 5.005 ?
+ (ABSTRACT_FROM => 'lib/RTSP/Server.pm',
+ AUTHOR => 'Mischa Spiegelmock <revmischa@cpan.org>') : ()),
+);
30 README
@@ -0,0 +1,30 @@
+RTSP-Server version
+========================
+
+This module is designed to accept a number of video input feeds, allow
+clients to connect and transmit RTSP commands to receive RTP data.
+
+INSTALLATION
+
+To install this module type the following:
+
+ perl Makefile.PL
+ make
+ make test
+ make install
+
+DEPENDENCIES
+
+This module requires these other modules and libraries:
+
+ AnyEvent::Socket
+
+COPYRIGHT AND LICENCE
+
+Copyright (C) 2010 by Mischa Spiegelmock
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself, either Perl version 5.10.0 or,
+at your option, any later version of Perl 5 you may have available.
+
+
222 lib/RTSP/Server.pm
@@ -0,0 +1,222 @@
+package RTSP::Server;
+
+#use 5.010000;
+use Moose;
+use namespace::autoclean;
+
+use RTSP::Server::Logger;
+use RTSP::Server::Source;
+use RTSP::Server::Client;
+
+our $VERSION = '0.01';
+our $RTP_START_PORT = 20_000;
+
+## configuration attributes
+
+has 'client_listen_port' => (
+ is => 'rw',
+ isa => 'Int',
+ default => '5454',
+);
+
+has 'source_listen_port' => (
+ is => 'rw',
+ isa => 'Int',
+ default => '5455',
+);
+
+has 'client_listen_address' => (
+ is => 'rw',
+ isa => 'Str',
+ default => '0.0.0.0',
+);
+
+has 'source_listen_address' => (
+ is => 'rw',
+ isa => 'Str',
+ default => '0.0.0.0',
+);
+
+has 'log_level' => (
+ is => 'rw',
+ isa => 'Int',
+ default => 2,
+);
+
+## internal attributes
+
+has 'rtp_start_port' => (
+ is => 'rw',
+ isa => 'Int',
+ default => $RTP_START_PORT,
+);
+
+has 'source_server' => (
+ is => 'rw',
+ clearer => 'close_source_server',
+);
+
+has 'client_server' => (
+ is => 'rw',
+ clearer => 'close_client_server',
+);
+
+has 'logger' => (
+ is => 'rw',
+ isa => 'RTSP::Server::Logger',
+ handles => [qw/ trace debug info warn error /],
+ lazy => 1,
+ builder => 'build_logger',
+);
+
+# map of uri => Mount
+has 'mounts' => (
+ is => 'rw',
+ isa => 'HashRef',
+ default => sub { {} },
+ lazy => 1,
+);
+
+sub next_rtp_start_port {
+ my ($self) = @_;
+
+ my $port = $self->rtp_start_port;
+ $self->rtp_start_port($port + 2);
+
+ return $port;
+}
+
+# call from time to time to keep things tidy
+sub housekeeping {
+ my ($self) = @_;
+
+ # if we have no more mount points, it's safe to reset the rtp
+ # start ports
+ unless (keys %{ $self->mounts }) {
+ $self->rtp_start_port($RTP_START_PORT);
+ }
+}
+
+# call this to start the server
+sub listen {
+ my ($self) = @_;
+
+ print "Starting RTSP server, log level = " . $self->log_level . "\n";
+
+ my $source_server = $self->start_source_server;
+ my $client_server = $self->start_client_server;
+}
+
+sub start_client_server {
+ my ($self) = @_;
+
+ $self->close_client_server;
+
+ my $bind_ip = $self->client_listen_address;
+ my $bind_port = $self->client_listen_port;
+
+ my $server = RTSP::Server::Client->new(
+ listen_address => $bind_ip,
+ listen_port => $bind_port,
+ server => $self,
+ );
+
+ $server->listen;
+
+ $self->client_server($server);
+ $self->info("Client server started");
+
+ return $server;
+}
+
+sub start_source_server {
+ my ($self) = @_;
+
+ $self->close_source_server;
+
+ my $bind_ip = $self->source_listen_address;
+ my $bind_port = $self->source_listen_port;
+
+ my $server = RTSP::Server::Source->new(
+ listen_address => $bind_ip,
+ listen_port => $bind_port,
+ server => $self,
+ );
+
+ $server->listen;
+
+ $self->source_server($server);
+ $self->info("Source server started");
+
+ return $server;
+}
+
+sub build_logger {
+ my ($self) = @_;
+
+ return RTSP::Server::Logger->new(
+ log_level => $self->log_level
+ );
+}
+
+1;
+
+__END__
+
+=head1 NAME
+
+RTSP::Server - Lightweight RTSP/RTP server. Like icecast, for video.
+
+=head1 SYNOPSIS
+
+ use AnyEvent;
+ use RTSP::Server;
+
+ my $srv = new RTSP::Server(
+ mount_points => [qw/ stream1.rtsp stream2.rtsp /],
+ max_clients => 10,
+ );
+
+ # listen and accept incoming connections asynchronously
+ # (returns immediately)
+ $srv->listen;
+
+ # main loop
+ my $cv = AnyEvent->condvar;
+ # ...
+ $cv->recv;
+
+ undef $srv; # when the server goes out of scope, all sockets will
+ # be cleaned up
+
+=head1 DESCRIPTION
+
+This server is designed to enable to rebroadcasting of RTSP/RTP
+streams to clients.
+
+=head2 EXPORT
+
+None by default.
+
+=head1 TODO
+
+Authentication, automated tests.
+
+=head1 SEE ALSO
+
+L<RTSP::Proxy>, L<RTSP::Client>, L<AnyEvent::Socket>
+
+=head1 AUTHOR
+
+Mischa Spiegelmock, E<lt>revmischa@cpan.orgE<gt>
+
+=head1 COPYRIGHT AND LICENSE
+
+Copyright (C) 2010 by Mischa Spiegelmock
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself, either Perl version 5.10.0 or,
+at your option, any later version of Perl 5 you may have available.
+
+
+=cut
17 lib/RTSP/Server/Client.pm
@@ -0,0 +1,17 @@
+# This class represents a server which listens and accepts client
+# requests to stream video
+
+package RTSP::Server::Client;
+
+use Moose;
+ with 'RTSP::Server::Listener';
+
+use namespace::autoclean;
+
+has 'connection_class' => (
+ is => 'ro',
+ isa => 'Str',
+ default => 'Client',
+);
+
+__PACKAGE__->meta->make_immutable;
135 lib/RTSP/Server/Client/Connection.pm
@@ -0,0 +1,135 @@
+# This class represents a client connection, which can request streams
+# of video
+
+package RTSP::Server::Client::Connection;
+
+use Moose;
+ with 'RTSP::Server::Connection';
+
+use namespace::autoclean;
+use Socket;
+
+has 'client_socket' => (
+ is => 'rw',
+ clearer => 'clear_client_socket',
+);
+
+# packed sock addr of client
+has 'client_socket_dest' => (
+ is => 'rw',
+);
+
+has 'client_rtp_port' => (
+ is => 'rw',
+ isa => 'Int',
+);
+
+around 'public_options' => sub {
+ my ($orig, $self) = @_;
+
+ return ($self->$orig, qw/SETUP PLAY STOP/);
+};
+
+before 'teardown' => sub {
+ my ($self) = @_;
+
+ $self->close_socket;
+};
+
+sub play {
+ my ($self) = @_;
+
+ # should have this from SETUP
+ my $port = $self->client_rtp_port
+ or return $self->bad_request;
+
+ # find requested mount
+ my $mount = $self->get_mount;
+ unless ($mount) {
+ $self->not_found;
+ return;
+ }
+
+ # TODO: check auth
+
+ # create UDP socket
+ my($name, $alias, $udp_proto) = AnyEvent::Socket::getprotobyname('udp');
+ socket my($sock), PF_INET, SOCK_DGRAM, $udp_proto;
+ AnyEvent::Util::fh_nonblocking $sock, 1;
+ my $dest = sockaddr_in($port, Socket::inet_aton($self->client_address));
+
+ $self->client_socket_dest($dest);
+ $self->client_socket($sock);
+
+ $self->push_ok;
+}
+
+sub stop {
+ my ($self) = @_;
+
+ $self->close_socket;
+
+ $self->push_ok;
+}
+
+sub setup {
+ my ($self) = @_;
+
+ my $mount_path = $self->get_mount_path
+ or return $self->bad_request;
+
+ # strip off stream_id for now
+ my ($stream_id) = $mount_path =~ s!/streamid=(\d+)!!sgm;
+ $self->debug("setup stream id $stream_id");
+
+ my $mount = $self->get_mount($mount_path)
+ or return $self->not_found;
+
+ # should have transport header
+ my $transport = $self->get_req_header('Transport')
+ or return $self->bad_request;
+
+ # parse client ports out of transport header
+ my ($client_rtp_start_port, $client_rtp_end_port) =
+ $transport =~ m/client_port=(\d+)(?:\-(\d+))/smi;
+
+ unless ($client_rtp_start_port) {
+ $self->warn("Failed to find client RTP start port in SETUP request");
+ return $self->bad_request;
+ }
+
+ # save starting RTP port
+ $self->client_rtp_port($client_rtp_start_port);
+
+ $mount->add_client($self);
+
+ $self->push_ok;
+}
+
+sub send_packet {
+ my ($self, $pkt) = @_;
+
+ return unless $self->client_socket;
+
+ send $self->client_socket, $pkt, 0, $self->client_socket_dest;
+}
+
+sub close_socket {
+ my ($self) = @_;
+
+ my $mount = $self->get_mount;
+ $mount->add_client($self) if $mount;
+
+ my $sock = $self->client_socket or return;
+ shutdown $sock, 1; # done writing
+ $self->clear_client_socket;
+}
+
+sub DEMOLISH {
+ my ($self) = @_;
+
+ $self->close_socket;
+}
+
+__PACKAGE__->meta->make_immutable;
+
390 lib/RTSP/Server/Connection.pm
@@ -0,0 +1,390 @@
+package RTSP::Server::Connection;
+
+use Moose::Role;
+use namespace::autoclean;
+
+use RTSP::Server::Session;
+use RTSP::Server::Mount;
+
+use Carp qw/croak/;
+use URI;
+
+has 'id' => (
+ is => 'ro',
+ isa => 'Int',
+ required => 1,
+);
+
+has 'client_address' => (
+ is => 'ro',
+ isa => 'Str',
+ required => 1,
+);
+
+has 'client_port' => (
+ is => 'ro',
+ isa => 'Int',
+ required => 1,
+);
+
+has 'handle' => (
+ is => 'rw',
+ accessor => 'h',
+);
+
+has 'current_method' => (
+ is => 'rw',
+ isa => 'Str',
+ clearer => 'clear_current_method',
+);
+
+has 'req_uri' => (
+ is => 'rw',
+ isa => 'Str',
+ clearer => 'clear_req_uri',
+);
+
+has 'expecting_header' => (
+ is => 'rw',
+ isa => 'Bool',
+);
+
+has 'body' => (
+ is => 'rw',
+ isa => 'Str',
+ clearer => 'clear_body',
+);
+
+# map of header => \@values
+has 'req_headers' => (
+ is => 'rw',
+ isa => 'HashRef',
+ clearer => 'clear_req_headers',
+ lazy => 1,
+ default => sub { {} },
+);
+
+# map of header => \@values
+has 'resp_headers' => (
+ is => 'rw',
+ isa => 'HashRef',
+ clearer => 'clear_resp_headers',
+ lazy => 1,
+ default => sub { {} },
+);
+
+has 'session' => (
+ is => 'rw',
+ isa => 'RTSP::Server::Session',
+ handles => [qw/ session_id rtp_start_port rtp_end_port
+ rtp_port_range get_rtp_listen_ports /],
+ lazy => 1,
+ builder => 'build_session',
+ clearer => 'clear_session',
+);
+
+has 'server' => (
+ is => 'ro',
+ isa => 'RTSP::Server',
+ required => 1,
+ handles => [qw/ next_rtp_start_port mounts trace debug info warn error /],
+);
+
+# should return a list of supported methods
+sub public_options {
+ return qw/OPTIONS DESCRIBE TEARDOWN/;
+}
+
+sub private_options {
+ return qw//;
+}
+
+sub teardown {
+ my ($self) = @_;
+
+ $self->clear_session;
+}
+
+sub describe {
+ my ($self) = @_;
+
+ my $mount = $self->get_mount
+ or return $self->not_found;
+
+ $self->push_ok($mount->sdp);
+}
+
+sub options {
+ my ($self) = @_;
+
+ my @pub_methods = $self->public_options;
+ my @priv_methods = $self->private_options;
+
+ $self->add_resp_header('Public', join(', ', @pub_methods));
+ $self->add_resp_header('Private', join(', ', @priv_methods))
+ if @priv_methods;
+
+ $self->push_ok;
+}
+
+sub bad_request {
+ my ($self) = @_;
+ $self->push_response(400, "Bad Request");
+}
+
+sub not_found {
+ my ($self) = @_;
+
+ $self->info("Returning 404 for " . $self->req_uri);
+
+ $self->push_response(404, "Not Found");
+}
+
+sub internal_server_error {
+ my ($self) = @_;
+ $self->push_response(500, "Internal Server Error");
+}
+
+sub push_ok {
+ my ($self, $body) = @_;
+ $self->push_response(200, 'OK', $body);
+}
+
+sub build_session {
+ my ($self) = @_;
+
+ my $sess = RTSP::Server::Session->new(
+ rtp_start_port => $self->next_rtp_start_port,
+ );
+
+ return $sess;
+}
+
+sub request_finished {
+ my ($self) = @_;
+
+ my $method = $self->current_method;
+ unless ($method) {
+ $self->error("Finished parsing request but did not find method");
+ return;
+ }
+
+ $self->handle_request;
+}
+
+sub handle_request {
+ my ($self) = @_;
+
+ unless ($self->current_method) {
+ croak "handle_request called without current_method set";
+ }
+
+ my $method = lc $self->current_method;
+
+ # TODO: check auth
+ my @allowed_methods = ($self->public_options, $self->private_options);
+ if (grep { lc $_ eq $method } @allowed_methods) {
+ my $ok = eval {
+ $self->$method;
+ 1;
+ };
+
+ if (! $ok || $@) {
+ $self->error("Error handling " . uc($method) . ": " .
+ ($@ || 'unknown error'));
+ }
+ } else {
+ $self->push_response(405, 'Method Not Allowed');
+ }
+
+ $self->reset;
+}
+
+sub add_req_header {
+ my ($self, $hdr, $val) = @_;
+
+ $self->req_headers->{$hdr} ||= [];
+ my $vals = $self->req_headers->{$hdr};
+ push @$vals, $val;
+
+ return $val;
+}
+
+sub add_resp_header {
+ my ($self, $hdr, $val) = @_;
+
+ $self->resp_headers->{$hdr} ||= [];
+ my $vals = $self->resp_headers->{$hdr};
+ push @$vals, $val;
+
+ return $val;
+}
+
+# get a single header value. warns if multiple values are found
+sub get_req_header {
+ my ($self, $hdr) = @_;
+
+ my $vals = $self->req_headers->{$hdr} or return;
+ if (@$vals > 1) {
+ $self->warn("Found multiple values for request header '$hdr' but expected only one");
+ }
+
+ return $vals->[0];
+}
+
+# same as above
+sub get_resp_header {
+ my ($self, $hdr) = @_;
+
+ my $vals = $self->resp_headers->{$hdr} or return;
+ if (@$vals > 1) {
+ $self->warn("Found multiple values for response header '$hdr' but expected only one");
+ }
+
+ return $vals->[0];
+}
+
+sub push_response {
+ my ($self, $code, $msg, $body) = @_;
+
+ return unless $self->h;
+ $self->push_resp_line("RTSP/1.0 $code $msg");
+
+ # push headers
+ foreach my $hdr (keys %{ $self->resp_headers }) {
+ foreach my $val (@{ $self->resp_headers->{$hdr} }) {
+ $self->trace("header: $hdr, val: $val");
+ $self->push_resp_line("$hdr: $val");
+ }
+ }
+
+ # add content-length header if there's a body to return
+ $self->push_resp_line("Content-Length: " . length($body))
+ if $body;
+
+ # add cseq, if available
+ my $cseq = $self->req_cseq;
+ $self->push_resp_line("CSeq: $cseq") if $cseq;
+
+ # add session id, if available
+ if ($self->session) {
+ my $session_id = $self->session->session_id;
+ $self->push_resp_line("Session: $session_id");
+ }
+
+ # end of headers
+ $self->h->push_write("\r\n");
+
+ # body?
+ $self->h->push_write($body) if $body;
+
+ $self->info("Returning error $code: $msg")
+ if $code !~ /2\d\d/;
+}
+
+sub push_resp_line {
+ my ($self, $line) = @_;
+
+ $self->trace(" << $line");
+ $self->h->push_write("$line\r\n");
+}
+
+sub req_cseq {
+ my ($self) = @_;
+
+ return $self->get_req_header('cseq') ||
+ $self->get_req_header('Cseq') ||
+ $self->get_req_header('CSeq');
+}
+
+sub req_content_length {
+ my ($self) = @_;
+
+ return $self->get_req_header('content-length') ||
+ $self->get_req_header('Content-length') ||
+ $self->get_req_header('Content-Length');
+}
+
+# parse a uri, find the path
+sub get_mount_path {
+ my ($self, $uri) = @_;
+
+ $uri ||= $self->req_uri or return;
+ my $u = new URI($uri) or return;
+
+ my $path = $u->path or return;
+
+ return $path;
+}
+
+# get a stream
+sub get_mount {
+ my ($self, $path) = @_;
+
+ $path ||= $self->get_mount_path or return;
+ return $self->mounts->{$path};
+}
+
+# returns new mount point
+sub mount {
+ my ($self, %opts) = @_;
+
+ # check args
+ my ($path, $sdp);
+ {
+ $path = delete $opts{path}
+ or croak "Connection->mount() called with no path";
+
+ $sdp = delete $opts{sdp}
+ or croak "Connection->mount() called with no SDP info";
+
+ croak 'Unknown options: ' . join(', ', keys %opts)
+ if keys %opts;
+ }
+
+ # unmount existing mountpoint if it exists
+ $path ||= $self->get_mount_path or return;
+ $self->unmount($path) if $self->get_mount($path);
+
+ # create mount point
+ my $mount = new RTSP::Server::Mount(
+ path => $path,
+ sdp => $sdp,
+ );
+
+ $self->mounts->{$path} = $mount;
+
+ $self->info("Mounted $path");
+
+ return $mount;
+}
+
+# delete a stream
+sub unmount {
+ my ($self, $path) = @_;
+
+ $path ||= $self->get_mount_path or return;
+ delete $self->mounts->{$path};
+
+ $self->info("Unmounting $path");
+}
+
+sub reset {
+ my ($self) = @_;
+
+ $self->clear_req_headers;
+ $self->clear_resp_headers;
+
+ $self->clear_req_uri;
+ $self->clear_current_method;
+ $self->clear_body;
+ $self->expecting_header(0);
+}
+
+sub DEMOLISH {
+ my ($self) = @_;
+
+ $self->server->housekeeping;
+}
+
+1;
171 lib/RTSP/Server/Listener.pm
@@ -0,0 +1,171 @@
+package RTSP::Server::Listener;
+
+use Moose::Role;
+use namespace::autoclean;
+
+use AnyEvent::Handle;
+use AnyEvent::Socket;
+
+use RTSP::Server::Source::Connection;
+use RTSP::Server::Client::Connection;
+
+has 'listen_address' => (
+ is => 'rw',
+ isa => 'Str',
+ required => 1,
+);
+
+has 'listen_port' => (
+ is => 'rw',
+ isa => 'Int',
+ required => 1,
+);
+
+has 'listener' => (
+ is => 'rw',
+);
+
+has 'connection_class' => (
+ is => 'ro',
+ isa => 'Str',
+ required => 1,
+);
+
+has 'next_connection_id' => (
+ is => 'rw',
+ isa => 'Int',
+ default => 1,
+);
+
+# map of id => $connection
+has 'connections' => (
+ is => 'rw',
+ isa => 'HashRef',
+ default => sub { {} },
+ lazy => 1,
+);
+
+has 'server' => (
+ is => 'ro',
+ isa => 'RTSP::Server',
+ required => 1,
+ handles => [qw/ mounts mount get_mount unmount get_mount_path
+ trace debug info warn error /],
+);
+
+sub listen {
+ my ($self) = @_;
+
+ my $bind_ip = $self->listen_address;
+ my $bind_port = $self->listen_port;
+ my $conn_class = $self->connection_class;
+
+ my $listener = tcp_server $bind_ip, $bind_port, sub {
+ my ($fh, $rhost, $rport) = @_;
+
+ $self->info("$conn_class connection from $rhost:$rport");
+
+ # create object to track client
+ my $conn = "RTSP::Server::${conn_class}::Connection"->new(
+ id => $self->next_connection_id,
+ client_address => $rhost,
+ client_port => $rport,
+ server => $self->server,
+ );
+
+ $self->next_connection_id($self->next_connection_id + 1);
+
+ my $handle;
+ my $cleanup = sub {
+ delete $self->connections->{$conn->id};
+ $handle->destroy;
+ undef $handle;
+ };
+
+ $handle = new AnyEvent::Handle
+ fh => $fh,
+ on_eof => sub {
+ $self->debug("Got EOF on listener");
+ $cleanup->();
+ },
+ on_error => sub {
+ my (undef, $fatal, $msg) = @_;
+
+ $self->error("Got " . ($fatal ? 'fatal ' : '') .
+ "error on $conn_class listener socket: $msg");
+ $cleanup->();
+ },
+ on_read => sub {
+ $handle->push_read(
+ line => sub {
+ my (undef, $line, $eol) = @_;
+
+ $self->trace("$conn_class listener: >> $line");
+
+ # parse line of request
+ if (! $conn->current_method) {
+ # expecting method, URI, RTSP/1.0
+ my ($method, $uri, $version) = $line =~ m/
+ ^\s*(\w+)\s+ # method
+ (?:(.+)\s+)? # optional uri
+ RTSP\/([\d\.]+)\s*$ # version
+ /ix;
+
+ unless ($method && $version) {
+ $self->error("Unable to parse request '$line'");
+ $conn->push_response(400, "Bad Request");
+ return;
+ }
+
+ $self->debug("Got method $method");
+
+ $conn->current_method($method);
+ $conn->req_uri($uri) if $uri;
+ $conn->expecting_header(1);
+ } elsif ($conn->expecting_header) {
+ # expecting header
+ if (! $line) {
+ # end of headers
+ $self->trace("End of headers");
+
+ $conn->expecting_header(0);
+
+ # did we get content-length? if so, we are expecting the body
+ my $length = $conn->req_content_length;
+ if ($length) {
+ $handle->push_read(chunk => $length, sub {
+ my (undef, $data) = @_;
+
+ $self->trace("Finished reading body, length=$length");
+ $conn->body($data);
+
+ $conn->request_finished;
+ });
+ } else {
+ $conn->request_finished;
+ }
+ } else {
+ # we got a header
+ my ($header, $value) = $line =~ m/\s*([-\w]+)\s*:\s+(.*)$/;
+ $self->trace("Got header $header, value $value");
+
+ $conn->add_req_header($header, $value);
+ }
+ } else {
+ # not expecting header, not expecting method. should not get here.
+ $self->error("Unable to parse line: $line");
+ }
+ },
+ );
+ };
+
+ $conn->h($handle);
+
+ # save connection object
+ $self->connections->{$conn->id} = $conn;
+ } or die $!;
+
+ $self->listener($listener);
+}
+
+1;
51 lib/RTSP/Server/Logger.pm
@@ -0,0 +1,51 @@
+package RTSP::Server::Logger;
+
+use Moose;
+use namespace::autoclean;
+
+has 'log_level' => (
+ is => 'rw',
+ isa => 'Int',
+ default => 2,
+);
+
+sub log {
+ my ($self, $level, $msg) = @_;
+
+ return if $level > $self->log_level;
+
+ if ($level > 2) {
+ # info/debug go to stdout
+ print "$msg\n";
+ } else {
+ # warn/error go to stderr
+ warn "$msg\n";
+ }
+}
+
+sub trace {
+ my ($self, $msg) = @_;
+ return $self->log(5, $msg);
+}
+
+sub debug {
+ my ($self, $msg) = @_;
+ return $self->log(4, $msg);
+}
+
+sub info {
+ my ($self, $msg) = @_;
+ return $self->log(3, $msg);
+}
+
+sub warn {
+ my ($self, $msg) = @_;
+ return $self->log(2, $msg);
+}
+
+sub error {
+ my ($self, $msg) = @_;
+ return $self->log(1, $msg);
+}
+
+__PACKAGE__->meta->make_immutable;
52 lib/RTSP/Server/Mount.pm
@@ -0,0 +1,52 @@
+package RTSP::Server::Mount;
+
+use Moose;
+use namespace::autoclean;
+
+has 'path' => (
+ is => 'ro',
+ isa => 'Str',
+ required => 1,
+);
+
+has 'sdp' => (
+ is => 'rw',
+ required => 1,
+);
+
+has 'mounted' => (
+ is => 'rw',
+ isa => 'Bool',
+ default => 0,
+);
+
+# map of session_id -> client connection
+has 'clients' => (
+ is => 'rw',
+ isa => 'HashRef',
+ default => sub { {} },
+);
+
+sub add_client {
+ my ($self, $client) = @_;
+
+ $self->clients->{$client->session_id} = $client;
+}
+
+sub remove_client {
+ my ($self, $client) = @_;
+
+ delete $self->clients->{$client->session_id};
+}
+
+# broadcast a video packet to all clients
+sub broadcast {
+ my ($self, $pkt) = @_;
+
+ foreach my $client (values %{ $self->clients }) {
+ $client->send_packet($pkt);
+ }
+}
+
+__PACKAGE__->meta->make_immutable;
+
100 lib/RTSP/Server/RTPListener.pm
@@ -0,0 +1,100 @@
+package RTSP::Server::RTPListener;
+
+use Moose;
+use namespace::autoclean;
+
+use AnyEvent::Util;
+use Socket;
+
+has 'mount' => (
+ is => 'ro',
+ isa => 'RTSP::Server::Mount',
+ required => 1,
+);
+
+has 'host' => (
+ is => 'ro',
+ isa => 'Str',
+ required => 1,
+);
+
+has 'port' => (
+ is => 'ro',
+ isa => 'Int',
+ required => 1,
+);
+
+has 'read_size' => (
+ is => 'rw',
+ isa => 'Int',
+ default => 1420,
+);
+
+has 'watcher' => (
+ is => 'rw',
+);
+
+has 'socket' => (
+ is => 'rw',
+);
+
+sub listen {
+ my ($self) = @_;
+
+ # create UDP listener socket
+ my($name, $alias, $udp_proto) = AnyEvent::Socket::getprotobyname('udp');
+ socket my($sock), PF_INET, SOCK_DGRAM, $udp_proto;
+ AnyEvent::Util::fh_nonblocking $sock, 1;
+
+ unless (bind $sock, sockaddr_in($self->port, Socket::inet_aton($self->host))) {
+ warn("Error binding UDP listener to port " . $self->port . ": $!");
+ return;
+ }
+
+ $self->socket($sock);
+
+ my $buf;
+ my $read_size = $self->read_size;
+
+ my $w; $w = AnyEvent->io(
+ fh => $sock,
+ poll => 'r', cb => sub {
+ my $sender_addr = recv $sock, $buf, $read_size, 0;
+
+ # TODO: compare $sender_addr to expected addr
+
+ if (! defined $sender_addr) {
+ # error
+ $self->error("Error receiving RTP data");
+ undef $w;
+
+ return;
+ }
+
+ next unless $buf;
+
+ $self->mount->broadcast($buf);
+ }
+ );
+
+ $self->watcher($w);
+
+ # TODO: send UDP packet every 30 seconds to keep stateful UDP
+ # firewalls open
+
+ return 1;
+}
+
+sub DEMOLISH {
+ my ($self) = @_;
+
+ if ($self->socket) {
+ shutdown $self->socket, 2;
+ }
+
+ if ($self->child) {
+ $self->child->kill(2); # SIGINT
+ }
+}
+
+__PACKAGE__->meta->make_immutable;
59 lib/RTSP/Server/Session.pm
@@ -0,0 +1,59 @@
+package RTSP::Server::Session;
+
+use Moose;
+use namespace::autoclean;
+
+our $uniq = 1;
+
+has 'rtp_start_port' => (
+ is => 'rw',
+ isa => 'Int',
+ required => 1,
+);
+
+has 'rtp_end_port' => (
+ is => 'rw',
+ isa => 'Int',
+ lazy => 1,
+ builder => 'build_rtp_end_port',
+);
+
+has 'session_id' => (
+ is => 'rw',
+ isa => 'Str',
+ lazy => 1,
+ builder => 'build_session_id',
+);
+
+sub build_rtp_end_port {
+ my ($self) = @_;
+ return $self->rtp_start_port + 1;
+}
+
+sub rtp_port_range {
+ my ($self) = @_;
+
+ my (@rtp_listen_ports) = $self->get_rtp_listen_ports;
+ my $port_range = $rtp_listen_ports[0] . '-' .
+ $rtp_listen_ports[scalar @rtp_listen_ports - 1];
+
+ return $port_range;
+}
+
+sub get_rtp_listen_ports {
+ my ($self) = @_;
+
+ return ( $self->rtp_start_port .. $self->rtp_end_port );
+}
+
+sub build_session_id {
+ my ($self) = @_;
+
+ return $uniq++;
+}
+
+sub start_rtp_listener {
+ my ($self) = @_;
+}
+
+__PACKAGE__->meta->make_immutable;
17 lib/RTSP/Server/Source.pm
@@ -0,0 +1,17 @@
+# This class represents a server which listens and accepts source
+# requests to publish a video stream
+
+package RTSP::Server::Source;
+
+use Moose;
+ with 'RTSP::Server::Listener';
+
+use namespace::autoclean;
+
+has 'connection_class' => (
+ is => 'ro',
+ isa => 'Str',
+ default => 'Source',
+);
+
+__PACKAGE__->meta->make_immutable;
169 lib/RTSP/Server/Source/Connection.pm
@@ -0,0 +1,169 @@
+# This class represents a source connection, which can publish to a
+# video stream
+
+package RTSP::Server::Source::Connection;
+
+use Moose;
+ with 'RTSP::Server::Connection';
+
+use namespace::autoclean;
+use RTSP::Server::RTPListener;
+
+has 'rtp_listeners' => (
+ is => 'rw',
+ isa => 'ArrayRef',
+ default => sub { [] },
+ lazy => 1,
+);
+
+around 'public_options' => sub {
+ my ($orig, $self) = @_;
+
+ return ($self->$orig, qw/SETUP ANNOUNCE RECORD/);
+};
+
+# cleanup
+before 'teardown' => sub {
+ my ($self) = @_;
+
+ my $mount = $self->get_mount;
+
+ if ($mount) {
+ # TODO: notify clients connected to mount that it is closing
+
+ # should make sure stream is unmounted
+ $self->unmount;
+ }
+
+ $self->end_rtp_server;
+};
+
+sub start_rtp_server {
+ my ($self) = @_;
+
+ my $mount = $self->get_mount
+ or return;
+
+ $self->debug("Starting RTP listeners");
+ my @ports = $self->get_rtp_listen_ports;
+
+ my $ok = 0;
+ foreach my $port (@ports) {
+ $self->debug(" -> port $port");
+
+ my $listener = RTSP::Server::RTPListener->new(
+ mount => $mount,
+ host => $self->server->source_listen_address,
+ port => $port,
+ );
+
+ push @{ $self->rtp_listeners }, $listener;
+
+ unless ($listener->listen) {
+ $self->error("Failed to create RTP listener on port $port");
+ return;
+ }
+
+ $ok = 1;
+ }
+
+ return $ok;
+}
+
+sub end_rtp_server {
+ my ($self) = @_;
+
+ my $listeners = $self->rtp_listeners;
+ return unless @$listeners;
+
+ $self->debug("Shutting down RTP listeners");
+ foreach my $listener (@$listeners) {
+ $self->debug(" -> port " . $listener->port);
+ }
+
+ $self->rtp_listeners([]);
+}
+
+sub record {
+ my ($self) = @_;
+
+ my $mount = $self->get_mount;
+ unless ($mount) {
+ return $self->not_found;
+ }
+
+ $self->debug("Got record for mountpoint " . $mount->path);
+
+ if ($self->start_rtp_server) {
+ $self->push_ok;
+ $mount->mounted(1);
+ } else {
+ $self->not_found;
+ }
+}
+
+sub announce {
+ my ($self) = @_;
+
+ # we should have SDP data in the body
+ my $body = $self->body
+ or return $self->bad_request;
+
+ my $mount = $self->get_mount;
+
+ if ($mount) {
+ # mount is in use. return error.
+ $self->info("Source attempting to announce mountpoint " .
+ $mount->path . ', but it is already in use');
+ return $self->push_response(403, 'Forbidden');
+ }
+
+ $self->debug("Got source announcement for " . $self->req_uri);
+
+ # create mountpoint
+ my $mount_path = $self->get_mount_path($self->req_uri)
+ or return $self->bad_request;
+
+ $mount = $self->mount(
+ path => $mount_path,
+ sdp => $body,
+ );
+
+ unless ($mount) {
+ $self->error("Failed to mount stream at $mount_path");
+ return $self->bad_request;
+ }
+
+ $self->push_ok;
+
+ # TODO: broadcast announcement to all connected clients
+}
+
+sub setup {
+ my ($self) = @_;
+
+ my $mount_path = $self->get_mount_path
+ or return $self->not_found;
+
+ # does a mount exist? RTSP spec (10.4) says a client can issue a
+ # SETUP for an existing stream to change the params.
+ my $mount = $self->get_mount($mount_path);
+ if ($mount && $mount->mounted) {
+ # well, we don't support that yet.
+ $self->debug("SETUP request for $mount_path, but the mountpoint is in use");
+ return $self->push_response(455, 'Method Not Valid In This State');
+ }
+
+ # should have transport header
+ my $transport = $self->get_req_header('Transport')
+ or return $self->bad_request;
+
+ # add our RTP ports to transport header response
+ my $port_range = $self->rtp_port_range;
+ $self->add_resp_header("Transport", "$transport;server_port=$port_range");
+
+ $self->push_ok;
+}
+
+__PACKAGE__->meta->make_immutable;
+
27 rtsp-server.pl
@@ -0,0 +1,27 @@
+#!/usr/bin/env perl
+
+use strict;
+use warnings;
+use lib 'lib';
+
+use AnyEvent;
+use RTSP::Server;
+
+my $srv = new RTSP::Server(
+ mount_points => [qw/ stream1.rtsp stream2.rtsp /],
+ max_clients => 10,
+ log_level => 4,
+);
+
+# listen and accept incoming connections
+$srv->listen;
+
+# main loop
+my $cv = AnyEvent->condvar;
+
+# end if interrupt
+$SIG{INT} = sub {
+ $cv->send;
+};
+
+$cv->recv;
15 t/RTSP-Server.t
@@ -0,0 +1,15 @@
+# Before `make install' is performed this script should be runnable with
+# `make test'. After `make install' it should work as `perl RTSP-Server.t'
+
+#########################
+
+# change 'tests => 1' to 'tests => last_test_to_print';
+
+use Test::More tests => 1;
+BEGIN { use_ok('RTSP::Server') };
+
+#########################
+
+# Insert your test code below, the Test::More module is use()ed here so read
+# its man page ( perldoc Test::More ) for help writing this test script.
+

0 comments on commit 5bc43c2

Please sign in to comment.
Something went wrong with that request. Please try again.