Skip to content


Improve the send_peer process.
Browse files Browse the repository at this point in the history
Improvement of the way we handle startup. Get rid of peer_sup and link recv and send directly
into the control process. This in turn eliminates certain errors in the crash log and it makes the
system work better.
  • Loading branch information
jlouis committed Oct 20, 2012
1 parent f1a1513 commit 1fd7a35
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 68 deletions.
104 changes: 52 additions & 52 deletions src/etorrent_peer_control.erl
Expand Up @@ -68,9 +68,9 @@

%% @doc Register the current process as a peer process
register_server(TorrentID, Socket) ->
register_server(TorrentId, Socket) ->

%% @doc Lookup the process id of a specific peer.
lookup_server(Socket) ->
Expand All @@ -82,17 +82,17 @@ await_server(Socket) ->

%% @doc
-spec lookup_peers(torrentid()) -> [pid()].
lookup_peers(TorrentID) ->
lookup_peers(TorrentId) ->

%% @doc Name of a specific peer process
server_name(Socket) ->
{etorrent, Socket, peer}.

%& @doc Name of all peers in a torrent
group_name(TorrentID) ->
{etorrent, TorrentID, peers}.
group_name(TorrentId) ->
{etorrent, TorrentId, peers}.

Expand Down Expand Up @@ -244,11 +244,11 @@ poll_local_rqueue(Download, SendPid, Remote, Local) ->
%% A new chunk read should be started when a REQUEST message is pushed into an
%% empty request queue. The calling code is expected to only call this function
%% when the local peer is expected to send a PIECE request.
push_remote_rqueue_hook(TorrentID, Requests) ->
push_remote_rqueue_hook(TorrentId, Requests) ->
case etorrent_rqueue:size(Requests) of
1 ->
{Piece, Offset, Length} = etorrent_rqueue:peek(Requests),
{ok, _} = etorrent_io:aread_chunk(TorrentID, Piece, Offset, Length),
{ok, _} = etorrent_io:aread_chunk(TorrentId, Piece, Offset, Length),
N when N > 1 ->
Expand All @@ -259,26 +259,26 @@ push_remote_rqueue_hook(TorrentID, Requests) ->
%% A new chunk read should be started when a REQUEST message is popped from a non
%% empty request queue. The calling code is expected to call this function with
%% the most recent version of the queue.
pop_remote_rqueue_hook(TorrentID, Requests) ->
pop_remote_rqueue_hook(TorrentId, Requests) ->
case etorrent_rqueue:size(Requests) of
0 ->
N when N > 0 ->
{Piece, Offset, Length} = etorrent_rqueue:peek(Requests),
{ok, _} = etorrent_io:aread_chunk(TorrentID, Piece, Offset, Length),
{ok, _} = etorrent_io:aread_chunk(TorrentId, Piece, Offset, Length),

%% @private
init([TrackerUrl, LocalPeerID, InfoHash, TorrentID, {IP, Port}, Caps, Socket]) ->
init([TrackerUrl, LocalPeerID, InfoHash, TorrentId, {IP, Port}, Caps, Socket]) ->
%% Use socket handle as remote peer-id.
register_server(TorrentID, Socket),
Download = etorrent_download:await_servers(TorrentID),
register_server(TorrentId, Socket),
Download = etorrent_download:await_servers(TorrentId),

%% Keep track of the local state and the remote state
TorrentPid = etorrent_torrent_ctl:await_server(TorrentID),
TorrentPid = etorrent_torrent_ctl:await_server(TorrentId),
{ok, Valid} = etorrent_torrent_ctl:valid_pieces(TorrentPid),
Numpieces = etorrent_pieceset:capacity(Valid),
Local0 = etorrent_peerstate:new(Numpieces, 3, 16),
Expand All @@ -291,13 +291,13 @@ init([TrackerUrl, LocalPeerID, InfoHash, TorrentID, {IP, Port}, Caps, Socket]) -
Config = etorrent_peerconf:extended(Extended, Config1),

ok = etorrent_table:new_peer(TrackerUrl, IP, Port, TorrentID, self(), leeching),
ok = etorrent_table:new_peer(TrackerUrl, IP, Port, TorrentId, self(), leeching),
ok = etorrent_choker:monitor(self()),

{ok, RecieverPid} = etorrent_peer_recv:start_link(Id, Socket),
{ok, SenderPid} = etorrent_peer_send:start_link(Id, Socket, false),
{ok, _RecieverPid} = etorrent_peer_recv:start_link(TorrentId, Socket),
{ok, _SenderPid} = etorrent_peer_send:start_link(TorrentId, Socket, []), % No extensions
State = #state{
Expand Down Expand Up @@ -326,7 +326,7 @@ handle_cast({incoming_msg, Msg}, S) ->

handle_cast(choke, State) ->
torrent_id=TorrentID, send_pid=SendPid,
torrent_id=TorrentId, send_pid=SendPid,
remote=Remote, config=Config} = State,
case etorrent_peerstate:choked(Remote) of
false ->
Expand All @@ -338,7 +338,7 @@ handle_cast(choke, State) ->
false ->
etorrent_peer_states:set_local_choke(TorrentID, self()),
etorrent_peer_states:set_local_choke(TorrentId, self()),
NewReqs = etorrent_rqueue:flush(Reqs),
TmpRemote = etorrent_peerstate:requests(NewReqs, Remote),
Expand All @@ -350,13 +350,13 @@ handle_cast(choke, State) ->

handle_cast(unchoke, State) ->
#state{torrent_id=TorrentID, send_pid=SendPid, remote=Remote} = State,
#state{torrent_id=TorrentId, send_pid=SendPid, remote=Remote} = State,
case etorrent_peerstate:choked(Remote) of
false ->
%% @todo handle duplicate unchoke?
{noreply, State};
true ->
etorrent_peer_states:set_local_unchoke(TorrentID, self()),
etorrent_peer_states:set_local_unchoke(TorrentId, self()),
NewRemote = etorrent_peerstate:choked(false, Remote),
NewState = State#state{remote=NewRemote},
Expand Down Expand Up @@ -406,7 +406,7 @@ handle_info({chunk, {fetched, Index, Offset, Length, _}}, State) ->
{noreply, NewState};

handle_info({chunk, {contents, Index, Offset, Length, Data}}, State) ->
#state{torrent_id=TorrentID, send_pid=SendPid, remote=Remote} = State,
#state{torrent_id=TorrentId, send_pid=SendPid, remote=Remote} = State,
Requests = etorrent_peerstate:requests(Remote),
Choked = etorrent_peerstate:choked(Remote),
case etorrent_rqueue:peek(Requests) of
Expand All @@ -427,8 +427,8 @@ handle_info({chunk, {contents, Index, Offset, Length, Data}}, State) ->
NewState = State#state{remote=NewRemote},
ok = etorrent_peer_send:piece(SendPid, Index, Offset, Length, Data),
% Already in peer_send? It cause double-rating!
% ok = etorrent_torrent:statechange(TorrentID, [{add_upload, Length}]),
ok = pop_remote_rqueue_hook(TorrentID, NewRequests),
% ok = etorrent_torrent:statechange(TorrentId, [{add_upload, Length}]),
ok = pop_remote_rqueue_hook(TorrentId, NewRequests),
{noreply, NewState};
%% Same as clause #2. Peer returned to unchoked state. Non empty queue
%% while choked is considered invalid. It should have been flushed.
Expand Down Expand Up @@ -515,11 +515,11 @@ handle_message(keep_alive, S) ->
{ok, S};
handle_message(choke, State) ->
download=Download} = State,
ok = etorrent_peer_states:set_choke(TorrentID, self()),
ok = etorrent_peer_states:set_choke(TorrentId, self()),
NewState = case etorrent_peerconf:fast(Config) of
true ->
%% If the Fast Extension is enabled a CHOKE message does
Expand All @@ -531,7 +531,7 @@ handle_message(choke, State) ->
Requests = etorrent_peerstate:requests(Local),
Pieces = etorrent_rqueue:pieces(Requests),
Chunks = etorrent_rqueue:to_list(Requests),
Peers = etorrent_peer_control:lookup_peers(TorrentID),
Peers = etorrent_peer_control:lookup_peers(TorrentId),
ok = etorrent_piecestate:unassigned(Pieces, Peers),
ok = etorrent_download:chunks_dropped(Chunks, Download),
NewReqs = etorrent_rqueue:flush(Requests),
Expand All @@ -542,32 +542,32 @@ handle_message(choke, State) ->
{ok, NewState};

handle_message(unchoke, State) ->
#state{torrent_id=TorrentID} = State,
#state{torrent_id=TorrentId} = State,
#state{send_pid=SendPid, download=Download, local=Local, remote=Remote} = State,
ok = etorrent_peer_states:set_unchoke(TorrentID, self()),
ok = etorrent_peer_states:set_unchoke(TorrentId, self()),
TmpLocal = etorrent_peerstate:choked(false, Local),
NewLocal = poll_local_rqueue(Download, SendPid, Remote, TmpLocal),
NewState = State#state{local=NewLocal},
{ok, NewState};

handle_message(interested, State) ->
#state{torrent_id=TorrentID, remote=Remote} = State,
ok = etorrent_peer_states:set_interested(TorrentID, self()),
#state{torrent_id=TorrentId, remote=Remote} = State,
ok = etorrent_peer_states:set_interested(TorrentId, self()),
ok = etorrent_peer_control:check_choke(self()),
NewRemote = etorrent_peerstate:interested(true, Remote),
NewState = State#state{remote=NewRemote},
{ok, NewState};

handle_message(not_interested, State) ->
#state{torrent_id=TorrentID, remote=Remote} = State,
ok = etorrent_peer_states:set_not_interested(TorrentID, self()),
#state{torrent_id=TorrentId, remote=Remote} = State,
ok = etorrent_peer_states:set_not_interested(TorrentId, self()),
ok = etorrent_peer_control:check_choke(self()),
NewRemote = etorrent_peerstate:interested(false, Remote),
NewState = State#state{remote=NewRemote},
{ok, NewState};

handle_message({request, Index, Offset, Length}, State) ->
#state{torrent_id=TorrentID, remote=Remote, config=Config, send_pid=SendPid} = State,
#state{torrent_id=TorrentId, remote=Remote, config=Config, send_pid=SendPid} = State,
Requests = etorrent_peerstate:requests(Remote),
NewRequests = etorrent_rqueue:push(Index, Offset, Length, Requests),
IsOverlimit = etorrent_rqueue:is_overlimit(NewRequests),
Expand Down Expand Up @@ -600,7 +600,7 @@ handle_message({request, Index, Offset, Length}, State) ->
%% PIECE message as a response to this message. If the local peer
%% chokes the remote peer before a response is sent the same rules
%% apply to this request as a requests received after the choke.
ok = push_remote_rqueue_hook(TorrentID, NewRequests),
ok = push_remote_rqueue_hook(TorrentId, NewRequests),
NewRemote = etorrent_peerstate:requests(NewRequests, Remote),
NewState = State#state{remote=NewRemote},
{ok, NewState}
Expand Down Expand Up @@ -638,62 +638,62 @@ handle_message({suggest, Piece}, State) ->
{ok, State};

handle_message({have, Piece}, State) ->
#state{torrent_id=TorrentID, send_pid=SendPid,
#state{torrent_id=TorrentId, send_pid=SendPid,
download=Download, remote=Remote, local=Local} = State,
TmpRemote = etorrent_peerstate:hasone(Piece, Remote),
Pieceset = etorrent_peerstate:pieces(TmpRemote),
%% TODO - see etorrent_peerstate:haspieces/1
HasPieces = etorrent_peerstate:haspieces(Remote),
HasPieces orelse etorrent_scarcity:add_peer(TorrentID, Pieceset),
ok = etorrent_scarcity:add_piece(TorrentID, Piece, Pieceset),
HasPieces orelse etorrent_scarcity:add_peer(TorrentId, Pieceset),
ok = etorrent_scarcity:add_piece(TorrentId, Piece, Pieceset),
TmpLocal = check_local_interest(Piece, Local, SendPid),
NewRemote = check_remote_seeder(TmpRemote, TmpLocal),
NewLocal = poll_local_rqueue(Download, SendPid, NewRemote, TmpLocal),
NewState = State#state{remote=NewRemote, local=NewLocal},
{ok, NewState};

handle_message(have_none, State) ->
#state{torrent_id=TorrentID, remote=Remote, config=Config} = State,
#state{torrent_id=TorrentId, remote=Remote, config=Config} = State,
etorrent_peerconf:fast(Config) orelse erlang:error(badarg),
NewRemote = etorrent_peerstate:hasnone(Remote),
Pieceset = etorrent_peerstate:pieces(NewRemote),
ok = etorrent_scarcity:add_peer(TorrentID, Pieceset),
ok = etorrent_scarcity:add_peer(TorrentId, Pieceset),
NewState = State#state{remote=NewRemote},
{ok, NewState};

%%IsSeeder andalso etorrent_table:statechange_peer(self(), seeder),
handle_message(have_all, State) ->
#state{torrent_id=TorrentID, send_pid=SendPid, download=Download, remote=Remote, local=Local, config=Config} = State,
#state{torrent_id=TorrentId, send_pid=SendPid, download=Download, remote=Remote, local=Local, config=Config} = State,
etorrent_peerconf:fast(Config) orelse erlang:error(badarg),
TmpRemote = etorrent_peerstate:hasall(Remote),
Pieceset = etorrent_peerstate:pieces(TmpRemote),
ok = etorrent_scarcity:add_peer(TorrentID, Pieceset),
ok = etorrent_scarcity:add_peer(TorrentId, Pieceset),
TmpLocal = check_local_interest(Pieceset, Local, SendPid),
NewRemote = check_remote_seeder(TmpRemote, TmpLocal),
NewLocal = poll_local_rqueue(Download, SendPid, NewRemote, TmpLocal),
NewState = State#state{remote=NewRemote, local=NewLocal},
{ok, NewState};

handle_message({bitfield, Bitfield}, State) ->
#state{torrent_id=TorrentID, send_pid=SendPid, download=Download, local=Local, remote=Remote} = State,
#state{torrent_id=TorrentId, send_pid=SendPid, download=Download, local=Local, remote=Remote} = State,
TmpRemote = etorrent_peerstate:hasset(Bitfield, Remote),
Pieceset = etorrent_peerstate:pieces(TmpRemote),
ok = etorrent_scarcity:add_peer(TorrentID, Pieceset),
ok = etorrent_scarcity:add_peer(TorrentId, Pieceset),
TmpLocal = check_local_interest(Pieceset, Local, SendPid),
NewRemote = check_remote_seeder(TmpRemote, TmpLocal),
NewLocal = poll_local_rqueue(Download, SendPid, NewRemote, TmpLocal),
NewState = State#state{remote=NewRemote, local=NewLocal},
{ok, NewState};

handle_message({piece, Index, Offset, Data}, State) ->
#state{torrent_id=TorrentID} = State,
#state{torrent_id=TorrentId} = State,
#state{send_pid=SendPid, download=Download, local=Local, remote=Remote} = State,
Length = byte_size(Data),
Requests = etorrent_peerstate:requests(Local),
NewLocal = case etorrent_rqueue:is_head(Index, Offset, Length, Requests) of
true ->
ok = etorrent_download:chunk_fetched(Index, Offset, Length, Download),
ok = etorrent_io:write_chunk(TorrentID, Index, Offset, Data),
ok = etorrent_io:write_chunk(TorrentId, Index, Offset, Data),
ok = etorrent_download:chunk_stored(Index, Offset, Length, Download),
NewRequests = etorrent_rqueue:pop(Requests),
TmpLocal = etorrent_peerstate:requests(NewRequests, Local),
Expand Down Expand Up @@ -723,7 +723,7 @@ handle_message(Unknown, State) ->
% @doc Initialize the connection, depending on the way the connection is
connection_initialize(incoming, State) ->
Expand All @@ -733,18 +733,18 @@ connection_initialize(incoming, State) ->
Valid = etorrent_peerstate:pieces(Local),
case etorrent_proto_wire:complete_handshake(Socket, Infohash, LocalID) of
ok ->
SendPid = complete_connection_setup(Socket, TorrentID, Extended, Valid),
SendPid = complete_connection_setup(Socket, TorrentId, Extended, Valid),
NewState = State#state{send_pid=SendPid},
{ok, NewState};
{error, stop} ->
{stop, normal}

connection_initialize(outgoing, State) ->
#state{torrent_id=TorrentID, socket=Socket, local=Local, config=Config} = State,
#state{torrent_id=TorrentId, socket=Socket, local=Local, config=Config} = State,
Extended = etorrent_peerconf:extended(Config),
Valid = etorrent_peerstate:pieces(Local),
SendPid = complete_connection_setup(Socket, TorrentID, Extended, Valid),
SendPid = complete_connection_setup(Socket, TorrentId, Extended, Valid),
NewState = State#state{send_pid=SendPid},
{ok, NewState}.

Expand All @@ -756,7 +756,7 @@ connection_initialize(outgoing, State) ->
%% * Start the send pid
%% * Send off the bitfield
complete_connection_setup(Socket, _TorrentID, Extended, Valid) ->
complete_connection_setup(Socket, _TorrentId, Extended, Valid) ->
SendPid = etorrent_peer_send:await_server(Socket),
Bitfield = etorrent_pieceset:to_binary(Valid),
Extended andalso etorrent_peer_send:extended_msg(SendPid),
Expand Down
5 changes: 2 additions & 3 deletions src/etorrent_peer_pool.erl
Expand Up @@ -49,7 +49,7 @@ start_child(TrackerUrl, PeerId, InfoHash, Id,
{ok, _Pid} ->
RecvPid = etorrent_peer_recv:await_server(Socket),
ControlPid = etorrent_peer_control:await_server(Socket),
{ok, RecvPid, ControlPid}
{ok, RecvPid, ControlPid};
{error, Reason} ->
lager:warning("Error starting child: ~p", [Reason]),
{error, Reason}
Expand All @@ -60,7 +60,6 @@ start_child(TrackerUrl, PeerId, InfoHash, Id,
%% @private
init([Id]) ->
gproc:add_local_name({torrent, Id, peer_pool_sup}),
ChildSpec = {child, {
{etorrent_peer_control, start_link, []},
ChildSpec = {child, {etorrent_peer_control, start_link, []},
temporary, 5000, worker, [etorrent_peer_control]},
{ok, {{simple_one_for_one, 10, 3600}, [ChildSpec]}}.

0 comments on commit 1fd7a35

Please sign in to comment.