Skip to content

Commit

Permalink
Merge branch 'kill-ops'
Browse files Browse the repository at this point in the history
Conflicts:
	apps/etorrent/src/etorrent_dht_net.erl
  • Loading branch information
jlouis committed Jan 26, 2011
2 parents 74a1b12 + 8595336 commit 858a8ad
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 217 deletions.
7 changes: 0 additions & 7 deletions apps/etorrent/include/etorrent_chunk.hrl

This file was deleted.

1 change: 0 additions & 1 deletion apps/etorrent/include/types.hrl
@@ -1,6 +1,5 @@
%% Various types spanning multiple modules.
-type tier() :: [string()].
-type operation() :: {integer(), integer(), integer()}.
-type bitfield() :: binary().
-type ip() :: {integer(), integer(), integer(), integer()}.
-type capabilities() :: extended_messaging.
Expand Down
60 changes: 39 additions & 21 deletions apps/etorrent/src/etorrent_chunk_mgr.erl
Expand Up @@ -47,7 +47,6 @@
%%-------------------------------------------------------------------
-module(etorrent_chunk_mgr).

-include("etorrent_chunk.hrl").
-include("types.hrl").
-include("log.hrl").

Expand All @@ -57,16 +56,23 @@

%% @todo: What pid is the chunk recording pid? Control or SendPid?
%% API
-export([start_link/0, store_chunk/4, putback_chunks/1,
-export([start_link/0, store_chunk/2, putback_chunks/1,
mark_fetched/2, pick_chunks/3,
new/1, endgame_remove_chunk/3]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

%% A mapping containing the chunks tracking
-record(chunk, {idt :: {integer() | '_' | '$1',
integer() | '_' | '$1',
not_fetched | fetched | {assigned, pid() | '_'} | '_'},
chunk :: integer() | {integer() | '_', integer() | '_'} | '_' | '$2'}).

-record(state, { torrent_dict,
monitored_peers = gb_sets:empty() }).

-define(SERVER, ?MODULE).
-define(TAB, etorrent_chunk_tbl).
-define(STORE_CHUNK_TIMEOUT, 20).
Expand All @@ -91,10 +97,9 @@ mark_fetched(Id, {Index, Offset, Len}) ->
%% @doc Store the chunk in the chunk table.
%% As a side-effect, check the piece if it is fully fetched.
%% @end
-spec store_chunk(integer(), {integer(), binary(), term()}, {integer(), integer()}, pid()) ->
ok.
store_chunk(Id, {Index, D, Ops}, {Offset, Len}, FSPid) ->
gen_server:cast(?SERVER, {store_chunk, Id, self(), {Index, D, Ops}, {Offset, Len}, FSPid}).
-spec store_chunk(integer(), {integer(), integer(), binary()}) -> ok.
store_chunk(Id, {Index, Offset, D}) ->
gen_server:cast(?SERVER, {store_chunk, Id, self(), {Index, Offset, D}}).

%% @doc Find all chunks assigned to Pid and mark them as not_fetched
%% This is called by a peer when the remote chokes.
Expand All @@ -115,10 +120,9 @@ endgame_remove_chunk(SendPid, Id, {Index, Offset, Len}) ->

%% @doc Return some chunks for downloading.
%% @end
-type chunk_lst1() :: [{integer(), integer(), integer(), [operation()]}].
-type chunk_lst2() :: [{integer(), [#chunk{}]}].
-type chunk_lst() :: [{integer(), [{integer(), integer()}]}].
-spec pick_chunks(integer(), unknown | gb_set(), integer()) ->
none_eligible | not_interested | {ok | endgame, chunk_lst1() | chunk_lst2()}.
none_eligible | not_interested | {ok | endgame, chunk_lst()}.
pick_chunks(_Id, unknown, _N) ->
none_eligible;
pick_chunks(Id, Set, N) ->
Expand Down Expand Up @@ -162,7 +166,7 @@ handle_call({new, Id}, {Pid, _Tag}, S) ->
{reply, ok, S#state { torrent_dict = ManageDict }};
handle_call({mark_fetched, Id, Index, Offset, _Len}, _From, S) ->
case ets:match_object(?TAB, #chunk { idt = {Id, Index, not_fetched},
chunk = {Offset, '_', '_'} }) of
chunk = {Offset, '_'} }) of
[] -> {reply, assigned, S};
[Obj] -> ets:delete_object(?TAB, Obj),
{reply, found, S}
Expand Down Expand Up @@ -211,19 +215,19 @@ ensure_monitor(Pid, Set) ->
end.

%% @private
handle_cast({store_chunk, Id, Pid, {Index, Data, Ops}, {Offset, Len}, FSPid}, S) ->
handle_cast({store_chunk, Id, Pid, {Index, Offset, Data}}, S) ->
ok = etorrent_io:write_chunk(Id, Index, Offset, Data),
%% Add the newly fetched data to the fetched list
Present = update_fetched(Id, Index, {Offset, Len}),
Present = update_fetched(Id, Index, {Offset, byte_size(Data)}),
%% Update chunk assignment
update_chunk_assignment(Id, Index, Pid, {Offset, Len}),
update_chunk_assignment(Id, Index, Pid, {Offset, byte_size(Data)}),
%% Countdown number of missing chunks
case Present of
fetched -> ok;
true -> ok;
false ->
case etorrent_piece_mgr:decrease_missing_chunks(Id, Index) of
full -> check_piece(FSPid, Id, Index);
full -> check_piece(Id, Index);
X -> X
end
end,
Expand Down Expand Up @@ -273,7 +277,7 @@ clear_torrent_entries(Id) ->
%% @end
%% @todo Consider using ets:fun2ms here to parse-transform the matches
-spec find_remaining_chunks(integer(), set()) ->
[{integer(), integer(), integer(), [operation()]}].
[{integer(), integer(), integer()}].
find_remaining_chunks(Id, PieceSet) ->
%% Note that the chunk table is often very small.
MatchHeadAssign = #chunk { idt = {Id, '$1', {assigned, '_'}}, chunk = '$2'},
Expand All @@ -282,7 +286,7 @@ find_remaining_chunks(Id, PieceSet) ->
RowsN = ets:select(?TAB, [{MatchHeadNotFetch, [], [{{'$1', '$2'}}]}]),
Eligible = [{PN, Chunk} || {PN, Chunk} <- RowsA ++ RowsN,
gb_sets:is_element(PN, PieceSet)],
[{PN, Os, Sz, Ops} || {PN, {Os, Sz, Ops}} <- Eligible].
[{PN, Os, Sz} || {PN, {Os, Sz}} <- Eligible].

%% @doc Chunkify a new piece.
%%
Expand All @@ -302,7 +306,7 @@ chunkify_new_piece(Id, PieceSet) when is_integer(Id) ->
end.

%% Check the piece Idx on torrent Id for completion
check_piece(_, Id, Idx) ->
check_piece(Id, Idx) ->
_ = spawn_link(etorrent_fs_checker, check_piece, [Id, Idx]),
ets:match_delete(?TAB, #chunk { idt = {Id, Idx, '_'}, _ = '_'}).

Expand Down Expand Up @@ -353,7 +357,7 @@ update_fetched(Id, Index, {Offset, _Len}) ->
update_chunk_assignment(Id, Index, Pid,
{Offset, _Len}) ->
ets:match_delete(?TAB, #chunk { idt = {Id, Index, {assigned, Pid}},
chunk = {Offset, '_', '_'} }).
chunk = {Offset, '_'} }).

%%
%% There are 0 remaining chunks to be desired, return the chunks so far
Expand Down Expand Up @@ -399,13 +403,27 @@ pick_chunks(chunkify_piece, {Pid, Id, PieceSet, SoFar, Remaining, Res}) ->
none_eligible ->
{ok, SoFar}
end;
%% @todo: Go through from here and check if it can be parallelized!
%%
%% Handle the endgame for a torrent gracefully
pick_chunks(endgame, {Id, PieceSet, N}) ->
Remaining = find_remaining_chunks(Id, PieceSet),
Shuffled = etorrent_utils:list_shuffle(Remaining),
{endgame, lists:sublist(Shuffled, N)}.
Grouped = gather(
lists:sort(
lists:sublist(Shuffled, N))),
{endgame, etorrent_utils:list_shuffle(Grouped)}.

%% Gather like pieces in the endgame
gather([]) -> [];
gather([{PN, Off, Sz} | R]) ->
gather(PN, [{Off, Sz}], R).

gather(PN, Item, []) ->
[{PN, lists:sort(Item)}];
gather(PN, Items, [{PN, Off, Sz} | Next]) ->
gather(PN, [{Off, Sz} | Items], Next);
gather(PN, Items, [{PN2, Off, Sz} | Next]) ->
[{PN, lists:sort(Items)} | gather(PN2, [{Off, Sz}], Next)].


-spec pick_chunks_endgame(integer(), gb_set(), integer(), X) -> X | {endgame, [#chunk{}]}.
pick_chunks_endgame(Id, Set, Remaining, Ret) ->
Expand Down
8 changes: 4 additions & 4 deletions apps/etorrent/src/etorrent_dht_net.erl
Expand Up @@ -528,9 +528,9 @@ handle_query('get_peers', Params, IP, Port, MsgID, Self, Tokens) ->

handle_query('announce', Params, IP, Port, MsgID, Self, Tokens) ->
InfoHash = etorrent_dht:integer_id(get_value(<<"info_hash">>, Params)),
BTPort = get_value(<<"port">>, Params),
Token = get_value(<<"token">>, Params),
_ = case is_valid_token(Token, IP, Port, Tokens) of
BTPort = get_value(<<"port">>, Params),
Token = get_string(<<"token">>, Params),
case is_valid_token(Token, IP, Port, Tokens) of
true ->
etorrent_dht_tracker:announce(InfoHash, IP, BTPort);
false ->
Expand Down Expand Up @@ -619,7 +619,7 @@ renew_token(Tokens) ->


decode_msg(InMsg) ->
{ok, Msg} = etorrent_bcoding:decode(InMsg),
Msg = etorrent_bcoding:decode(InMsg),
MsgID = get_value(<<"t">>, Msg),
case get_value(<<"y">>, Msg) of
<<"q">> ->
Expand Down
2 changes: 1 addition & 1 deletion apps/etorrent/src/etorrent_fs_checker.erl
Expand Up @@ -65,7 +65,7 @@ read_and_check_torrent(Id, Path) ->
%% @end
-spec check_piece(torrent_id(), integer()) -> ok.
check_piece(TorrentID, PieceIndex) ->
{InfoHash, _} = etorrent_piece_mgr:piece_info(TorrentID, PieceIndex),
InfoHash = etorrent_piece_mgr:piece_hash(TorrentID, PieceIndex),
{ok, PieceBin} = etorrent_io:read_piece(TorrentID, PieceIndex),
PieceSize = byte_size(PieceBin),
case crypto:sha(PieceBin) == InfoHash of
Expand Down

0 comments on commit 858a8ad

Please sign in to comment.