/
RTPListener.pm
102 lines (76 loc) · 1.82 KB
/
RTPListener.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package RTSP::Server::RTPListener;
use Moose;
use namespace::autoclean;
use AnyEvent::Util;
use Socket;
has 'mount' => (
is => 'ro',
isa => 'RTSP::Server::Mount',
required => 1,
);
has 'stream' => (
is => 'ro',
isa => 'RTSP::Server::Mount::Stream',
required => 1,
);
has 'host' => (
is => 'ro',
isa => 'Str',
required => 1,
);
has 'port' => (
is => 'ro',
isa => 'Int',
required => 1,
);
has 'read_size' => (
is => 'rw',
isa => 'Int',
default => 1500,
);
has 'watcher' => (
is => 'rw',
);
has 'socket' => (
is => 'rw',
);
sub listen {
my ($self) = @_;
# create UDP listener socket
my($name, $alias, $udp_proto) = AnyEvent::Socket::getprotobyname('udp');
socket my($sock), PF_INET, SOCK_DGRAM, $udp_proto;
AnyEvent::Util::fh_nonblocking $sock, 1;
unless (bind $sock, sockaddr_in($self->port, Socket::inet_aton($self->host))) {
warn("Error binding UDP listener to port " . $self->port . ": $!");
return;
}
$self->socket($sock);
my $buf;
my $read_size = $self->read_size;
my $w; $w = AnyEvent->io(
fh => $sock,
poll => 'r', cb => sub {
my $sender_addr = recv $sock, $buf, $read_size, 0;
# TODO: compare $sender_addr to expected addr
if (! defined $sender_addr) {
# error
$self->error("Error receiving RTP data");
undef $w;
return;
}
next unless $buf;
$self->stream->broadcast($buf);
}
);
$self->watcher($w);
# TODO: send UDP packet every 30 seconds to keep stateful UDP
# firewalls open
return 1;
}
sub DEMOLISH {
my ($self) = @_;
if ($self->socket) {
shutdown $self->socket, 2;
}
}
__PACKAGE__->meta->make_immutable;