Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add loads of debugging stuff

  • Loading branch information...
commit 176f70e583474df2667641195d104dbd67952b93 1 parent 8bd8a06
@sanko authored
Showing with 178 additions and 37 deletions.
  1. +178 −37 lib/AnyEvent/BitTorrent.pm
View
215 lib/AnyEvent/BitTorrent.pm
@@ -32,6 +32,7 @@ sub _build_socket {
my $s = shift;
tcp_server undef, $s->port, sub {
my ($fh, $host, $port) = @_;
+ AE::log info => 'Accepted connection from %s:%d', $host, $port;
return $fh->destroy if $s->state eq 'stopped';
my $handle = AnyEvent::Handle->new(
fh => $fh,
@@ -39,21 +40,24 @@ sub _build_socket {
my ($hdl, $fatal, $msg) = @_;
# XXX - callback
- #AE::log error => "got error $msg\n";
+ AE::log error => 'Socket error: ' . $msg;
$s->_del_peer($hdl);
},
on_eof => sub {
my $h = shift;
+ AE::log info => 'Socket EOF';
$s->_del_peer($h);
},
- on_read => sub { $s->_on_read_incoming(@_) }
+ on_read => sub {
+ AE::log debug => 'Read Socket';
+ $s->_on_read_incoming(@_);
+ }
);
$s->_add_peer($handle);
}, sub {
my ($fh, $thishost, $thisport) = @_;
$s->_set_port($thisport);
-
- #AE::log info => "bound to $thishost, port $thisport";
+ AE::log info => "bound to $thishost, port $thisport";
};
}
has path => (
@@ -73,6 +77,7 @@ sub _build_reserved {
#vec($reserved, 5, 8) = 0x10; # Ext Protocol
vec($reserved, 7, 8) = 0x04; # Fast Ext
+ AE::log debug => '_build_reserved() => ' . $reserved;
$reserved;
}
has peerid => (
@@ -119,6 +124,7 @@ sub wanted {
vec($wanted, $index, 1) = $prio && !vec($s->bitfield, $index, 1);
}
}
+ AE::log debug => '->wanted() => ' . unpack 'b*', $wanted;
$wanted;
}
@@ -166,6 +172,10 @@ sub _build_metadata {
open my $fh, '<', $s->path;
sysread $fh, my $raw, -s $fh;
my $metadata = bdecode $raw;
+ AE::log debug => sub {
+ require Data::Dump;
+ '_build_metadata() => ' . Data::Dump::dump($metadata);
+ };
$metadata;
}
sub name { shift->metadata->{info}{name} }
@@ -230,18 +240,24 @@ sub _build_size {
my $s = shift;
my $ret = 0;
$ret += $_->{length} for @{$s->files};
+ AE::log debug => '_build_size() => ' . $ret;
$ret;
}
sub _open {
my ($s, $i, $m) = @_;
+ AE::log
+ trace => 'Opening file #%d (%s) for %s',
+ $i, $s->files->[$i]->{path}, $m;
return 1 if $s->files->[$i]->{mode} eq $m;
if (defined $s->files->[$i]->{fh}) {
+ AE::log trace => 'Closing %s', $s->files->[$i]->{fh};
flock $s->files->[$i]->{fh}, LOCK_UN;
close $s->files->[$i]->{fh};
$s->files->[$i]->{fh} = ();
}
if ($m eq 'r') {
+ AE::log trace => 'Opening %s to read', $s->files->[$i]->{path};
sysopen($s->files->[$i]->{fh}, $s->files->[$i]->{path}, O_RDONLY)
|| return;
flock($s->files->[$i]->{fh}, LOCK_SH) || return;
@@ -251,6 +267,7 @@ sub _open {
= AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
}
elsif ($m eq 'w') {
+ AE::log trace => 'Opening %s to write', $s->files->[$i]->{path};
my @split = File::Spec->splitdir($s->files->[$i]->{path});
pop @split; # File name itself
my $dir = File::Spec->catdir(@split);
@@ -274,16 +291,23 @@ sub _open {
}
has piece_cache => (is => 'ro', isa => 'HashRef', default => sub { {} });
+sub _cache_path {
+ my $s = shift;
+ File::Spec->catfile($s->basedir,
+ (scalar @{$s->files} == 1 ? () : $s->name),
+ '~ABPartFile_-'
+ . uc(substr(unpack('H*', $s->infohash), 0, 10))
+ . '.dat'
+ );
+}
+
sub _write_cache {
my ($s, $f, $o, $d) = @_;
- my $path =
- File::Spec->catfile($s->basedir,
- (scalar @{$s->files} == 1 ? () : $s->name),
- '~ABPartFile_-'
- . uc(
- substr(unpack('H*', $s->infohash), 0, 10))
- . '.dat'
- );
+ my $path = $s->_cache_path;
+ AE::log
+ debug =>
+ 'Attempting to store %d bytes to cache file (%s) [$f=%s, $o=%s]',
+ length($d), $path, $f, $o;
my @split = File::Spec->splitdir($path);
pop @split; # File name itself
my $dir = File::Spec->catdir(@split);
@@ -296,6 +320,7 @@ sub _write_cache {
flock $fh, LOCK_UN;
close $fh;
$s->piece_cache->{$f}{$o} = $pos;
+ AE::log debug => 'Wrote %d bytes to cache file', $w;
return $w;
}
@@ -303,14 +328,11 @@ sub _read_cache {
my ($s, $f, $o, $l) = @_;
$s->piece_cache->{$f} // return;
$s->piece_cache->{$f}{$o} // return;
- my $path =
- File::Spec->catfile($s->basedir,
- (scalar @{$s->files} == 1 ? () : $s->name),
- '~ABPartFile_-'
- . uc(
- substr(unpack('H*', $s->infohash), 0, 10))
- . '.dat'
- );
+ my $path = $s->_cache_path;
+ AE::log
+ debug =>
+ 'Attempting to read %d bytes from cache file (%s) [$f=%s, $o=%s]',
+ $l, $path, $f, $o;
sysopen(my ($fh), $path, O_RDONLY)
|| return;
flock $fh, LOCK_SH;
@@ -323,6 +345,10 @@ sub _read_cache {
sub _read {
my ($s, $index, $offset, $length) = @_;
+ AE::log
+ debug =>
+ 'Attempting to read %d bytes from piece %d starting at %d bytes',
+ $length, $index, $offset;
my $data = '';
my $file_index = 0;
my $total_offset = ($index * $s->piece_length) + $offset;
@@ -330,6 +356,10 @@ SEARCH:
while ($total_offset > $s->files->[$file_index]->{length}) {
$total_offset -= $s->files->[$file_index]->{length};
$file_index++;
+ AE::log
+ trace =>
+ 'Searching for location... $total_offset = %d, $file_index = %d',
+ $total_offset, $file_index;
last SEARCH # XXX - return?
if not defined $s->files->[$file_index]->{length};
}
@@ -340,15 +370,25 @@ READ: while ((defined $length) && ($length > 0)) {
?
($s->files->[$file_index]->{length} - $total_offset)
: $length;
+ AE::log
+ trace =>
+ 'Attempting to read %d bytes from file #%d (%s), starting at %d',
+ $this_read,
+ $file_index, $s->files->[$file_index]->{path}, $total_offset;
if ( (!-f $s->files->[$file_index]->{path})
|| (!$s->_open($file_index, 'r')))
{ $data .= $s->_read_cache($file_index, $total_offset, $this_read)
// ("\0" x $this_read);
+ AE::log note => 'Failed to open file. Using null chars instead.';
}
else {
sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
sysread $s->files->[$file_index]->{fh}, my ($_data), $this_read;
$data .= $_data if $_data;
+ AE::log
+ trace =>
+ 'Read %d bytes of data from file (%d bytes collected so far)',
+ length $_data, length $data;
weaken $s;
my $x = $file_index;
$s->files->[$x]->{timeout}
@@ -356,20 +396,35 @@ READ: while ((defined $length) && ($length > 0)) {
}
$file_index++;
$length -= $this_read;
+ AE::log
+ trace => 'Still need to read %d bytes',
+ $length;
last READ if not defined $s->files->[$file_index];
$total_offset = 0;
}
+ AE::log trace => 'Returning %d bytes of data', length $data;
return $data;
}
sub _write {
my ($s, $index, $offset, $data) = @_;
+ AE::log
+ debug =>
+ 'Attempting to write %d bytes from piece %d starting at %d bytes',
+ length($data), $index, $offset;
my $file_index = 0;
my $total_offset = int(($index * $s->piece_length) + ($offset || 0));
+ AE::log
+ debug => '...calculated offset == %d',
+ $total_offset;
SEARCH:
while ($total_offset > $s->files->[$file_index]->{length}) {
$total_offset -= $s->files->[$file_index]->{length};
$file_index++;
+ AE::log
+ trace =>
+ 'Searching for location... $total_offset = %d, $file_index = %d',
+ $total_offset, $file_index;
last SEARCH # XXX - return?
if not defined $s->files->[$file_index]->{length};
}
@@ -380,15 +435,24 @@ WRITE: while ((defined $data) && (length $data > 0)) {
?
($s->files->[$file_index]->{length} - $total_offset)
: length $data;
+ AE::log
+ trace =>
+ 'Attempting to write %d bytes from file #%d (%s), starting at %d',
+ $this_write,
+ $file_index, $s->files->[$file_index]->{path}, $total_offset;
if ($s->files->[$file_index]->{priority} == 0) {
$s->_write_cache($file_index, $total_offset, substr $data, 0,
$this_write, '');
+ AE::log trace => 'Wrote data to cache...';
}
else {
- $s->_open($file_index, 'w') || die $!;
+ $s->_open($file_index, 'w');
sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
my $w = syswrite $s->files->[$file_index]->{fh}, substr $data, 0,
$this_write, '';
+ AE::log
+ trace => 'Wrote %d bytes of data to file (%d bytes left)',
+ $w, length $data;
weaken $s;
my $x = $file_index;
$s->files->[$x]->{timeout}
@@ -404,19 +468,32 @@ WRITE: while ((defined $data) && (length $data > 0)) {
sub hashcheck (;@) {
my $s = shift;
my @indexes = @_ ? @_ : (0 .. $s->piece_count);
+ AE::log trace => sub {
+ require Data::Dump;
+ 'Hashcheck of : ' . Data::Dump::dump(\@indexes);
+ };
$s->bitfield; # Makes sure it's built
+ my $total_size = $s->size;
for my $index (@indexes) {
next if $index < 0 || $index > $s->piece_count;
my $piece = $s->_read($index,
0,
$index == $s->piece_count
?
- $s->size % $s->piece_length
+ $total_size % $s->piece_length
: $s->piece_length
);
- my $ok = defined($piece)
- && (substr($s->pieces, $index * 20, 20) eq sha1($piece));
+ my $expected = substr($s->pieces, $index * 20, 20);
+ my $reality = sha1($piece);
+ my $ok = defined($piece)
+ && ($expected eq $reality);
vec($s->{bitfield}, $index, 1) = $ok;
+ AE::log trace => sub {
+ "Validate piece #%06d %s, Expected: %s\n"
+ . " Reality: %s",
+ $index, ($ok ? 'PASS' : 'FAIL'), unpack('H*', $expected),
+ unpack('H*', $reality);
+ };
$ok ?
$s->_trigger_hash_pass($index)
: $s->_trigger_hash_fail($index);
@@ -531,8 +608,16 @@ has trackers => (
? @{$s->metadata->{'announce-list'}}
: ()
];
+ AE::log trace => sub {
+ require Data::Dump;
+ '$trackers before shuffle => ' . Data::Dump::dump($trackers);
+ };
$shuffle->($trackers);
$shuffle->($_->{urls}) for @$trackers;
+ AE::log trace => sub {
+ require Data::Dump;
+ '$trackers after shuffle => ' . Data::Dump::dump($trackers);
+ };
$trackers;
}
);
@@ -553,7 +638,7 @@ sub _announce_tier {
return if $tier->{urls}[0] !~ m[^https?://.+];
local $AnyEvent::HTTP::USERAGENT
= 'AnyEvent::BitTorrent/' . $AnyEvent::BitTorrent::VERSION;
- http_get $tier->{urls}[0] . '?info_hash=' . sub {
+ my $_url = $tier->{urls}[0] . '?info_hash=' . sub {
local $_ = shift;
s/([\W])/"%" . uc(sprintf("%2.2x",ord($1)))/eg;
$_;
@@ -565,8 +650,15 @@ sub _announce_tier {
. ('&left=' . $s->_left)
. ('&port=' . $s->port)
. '&compact=1'
- . ($e ? '&event=' . $e : ''), sub {
+ . ($e ? '&event=' . $e : '');
+ AE::log debug => 'Announce URL: ' . $_url;
+ http_get $_url, sub {
my ($body, $hdr) = @_;
+ AE::log trace => sub {
+ require Data::Dump;
+ 'Announce response: ' . Data::Dump::dump($body, $hdr);
+ };
+ $tier->{announcer} = ();
if ($hdr->{Status} =~ /^2/) {
my $reply = bdecode($body);
if (defined $reply->{'failure reason'}) { # XXX - Callback?
@@ -617,6 +709,7 @@ has _choke_timer => (
15, 45,
sub {
return if $s->state ne 'active';
+ AE::log trace => 'Choke timer...';
my @interested
= grep { $_->{remote_interested} && $_->{remote_choked} }
values %{$s->peers};
@@ -625,6 +718,7 @@ has _choke_timer => (
for my $p (@interested) {
$p->{remote_choked} = 0;
$s->_send_encrypted($p->{handle}, build_unchoke());
+ AE::log trace => 'Choked %s', $p->{peerid};
}
# XXX - Send choke to random peer
@@ -640,9 +734,10 @@ has _fill_requests_timer => (
default => sub {
my $s = shift;
AE::timer(
- 15, 1,
+ 15, 10,
sub { # XXX - Limit by time/bandwidth
return if $s->state ne 'active';
+ AE::log trace => 'Request fill timer...';
my @waiting
= grep { defined && scalar @{$_->{remote_requests}} }
values %{$s->peers};
@@ -650,8 +745,13 @@ has _fill_requests_timer => (
my $total_sent = 0;
while (@waiting && $total_sent < 2**20) {
my $p = splice(@waiting, rand @waiting, 1, ());
+ AE::log trace => 'Chosen peer: %s...', $p->{peerid};
while ($total_sent < 2**20 && @{$p->{remote_requests}}) {
my $req = shift @{$p->{remote_requests}};
+ AE::log
+ trace =>
+ 'Filling request i:%d, o:%d, l:%d for %s',
+ @$req;
# XXX - If piece is bad locally
# if remote supports fast ext
@@ -688,6 +788,7 @@ sub _build_peer_timer {
1, 15,
sub {
return if !$s->_left;
+ AE::log trace => 'Attempting to connect to new peer...';
# XXX - Initiate connections when we are in Super seed mode?
my @cache = map {
@@ -702,6 +803,7 @@ sub _build_peer_timer {
last
if scalar(keys %{$s->peers}) > 100; # XXX - Max peers
my $addr = splice @cache, rand $#cache, 1;
+ AE::log trace => 'Connecting to %s:%d', @$addr;
my $handle;
$handle = AnyEvent::Handle->new(
connect => $addr,
@@ -710,7 +812,9 @@ sub _build_peer_timer {
my ($hdl, $fatal, $msg) = @_;
# XXX - callback
- #AE::log error => "got error $msg\n";
+ AE::log
+ error => 'Socket error: %s (Removing peer)',
+ $msg;
$s->_del_peer($hdl);
},
on_connect_error => sub {
@@ -718,22 +822,28 @@ sub _build_peer_timer {
$s->_del_peer($hdl);
# XXX - callback
- #AE::log
- # error => sprintf "%sfatal error (%s)\n",
- # $fatal ? '' : 'non-',
- # $msg // 'Connection timed out';
+ AE::log
+ error => sprintf "%sfatal error (%s)\n",
+ $fatal ? '' : 'non-',
+ $msg // 'Connection timed out';
return if !$fatal;
},
on_connect => sub {
my ($h, $host, $port, $retry) = @_;
+ AE::log
+ trace => 'Connection established with %s:%d',
+ $host, $port;
$s->_add_peer($handle);
$s->_send_handshake($handle);
},
on_eof => sub {
my $h = shift;
+ AE::log trace => 'EOF from peer';
$s->_del_peer($h);
},
- on_read => sub { $s->_on_read(@_) }
+ on_read => sub {
+ $s->_on_read(@_);
+ }
);
}
}
@@ -747,12 +857,16 @@ sub _on_read_incoming {
# XXX - Handle things if the stream is encrypted
my $packet = parse_packet(\$h->rbuf);
return if !$packet;
+ AE::log trace => sub {
+ require Data::Dump;
+ 'Incoming packet: ' . Data::Dump::dump($packet);
+ };
if (defined $packet->{error}) {
return $s->_del_peer($h);
}
elsif ($packet->{type} == $HANDSHAKE) {
ref $packet->{payload} // return;
- return if !ref $packet->{payload} eq 'ARRAY';
+ return if ref $packet->{payload} ne 'ARRAY';
$s->peers->{$h}{reserved} = $packet->{payload}[0];
return $s->_del_peer($h)
if $packet->{payload}[1] ne $s->infohash;
@@ -772,6 +886,11 @@ sub _on_read_incoming {
sub _on_read {
my ($s, $h) = @_;
while (my $packet = parse_packet(\$h->rbuf)) {
+ last if !$packet;
+ AE::log debug => sub {
+ require Data::Dump;
+ 'Incoming packet: ' . Data::Dump::dump($packet->{error});
+ };
if (defined $packet->{error}) {
$s->_del_peer($h);
return;
@@ -1079,7 +1198,9 @@ sub _request_pieces {
: $block_size;
# XXX - Limit to x req per peer (future: based on bandwidth)
- #warn sprintf 'Requesting %d, %d, %d', $index, $offset, $_block_size;
+ AE::log
+ trace => 'Requesting %d, %d, %d',
+ $index, $offset, $_block_size;
$s->_send_encrypted($p->{handle},
build_request($index, $offset, $_block_size))
; # XXX - len for last piece
@@ -1142,33 +1263,48 @@ has state => (is => 'ro',
sub stop {
my $s = shift;
+ AE::log debug => 'Stopping...';
return if $s->state eq 'stopped';
+ AE::log trace => 'Announcing "stopped" event to trackers...';
$s->announce('stopped');
+ AE::log trace => 'Disconnecting peers...';
$s->_clear_peers;
+ AE::log trace => 'Stopping new peers ticker...';
$s->_clear_peer_timer;
+ AE::log trace => 'Closing files...';
$s->_open($_, 'c') for 0 .. $#{$s->files};
+ AE::log trace => 'Setting internal status...';
$s->_set_state('stopped');
}
sub start {
my $s = shift;
+ AE::log debug => 'Starting...';
$s->announce('started') unless $s->state eq 'active';
$s->peers;
+ AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
+ AE::log trace => 'Setting internal status...';
$s->_set_state('active');
}
sub pause {
my $s = shift;
+ AE::log debug => 'Pausing...';
$s->peers;
+ AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
+ AE::log trace => 'Setting internal status...';
$s->_set_state('paused');
}
#
sub BUILD {
my ($s, $a) = @_;
- $s->start if $s->state eq 'active';
- $s->paused if $s->state eq 'paused';
+ AE::log debug => 'BUILD()';
+ $s->start && AE::log debug => 'Calling ->start()'
+ if $s->state eq 'active';
+ $s->paused && AE::log debug => 'Calling ->paused() '
+ if $s->state eq 'paused';
}
# Testing stuff goes here
@@ -1176,11 +1312,16 @@ sub _send_encrypted {
my ($s, $h, $packet) = @_;
return if !$h; # XXX - $s->_del_peer($p->{handle})
# XXX - Currently doesn't do anything and may never do anything
+ AE::log trace => sub {
+ require Data::Dump;
+ 'Outgoing packet: ' . Data::Dump::dump($packet);
+ };
return $h->push_write($packet);
}
sub _send_handshake {
my ($s, $h) = @_;
+ AE::log trace => 'Outgoing handshake';
# XXX - Send encrypted handshake if encryption status is unknown or true
$h->push_write(build_handshake($s->reserved, $s->infohash, $s->peerid));
Please sign in to comment.
Something went wrong with that request. Please try again.