Permalink
Browse files

allows replication logs to be saved in their own file

  • Loading branch information...
benoitc committed Jun 15, 2013
1 parent 928e375 commit 45c7cc555e4bcaa6de84e5e9e7aa574eeb569f50
@@ -37,10 +37,13 @@
-define(DEFAULT_ATTACHMENT_CONTENT_TYPE, <<"application/octet-stream">>).
-define(LOG_DEBUG(Format, Args), lager:debug(Format, Args)).
+-define(LOG_DEBUG(Attr, Format, Args), lager:debug(Attr, Format, Args)).
-define(LOG_INFO(Format, Args), lager:info(Format, Args)).
+-define(LOG_INFO(Attr, Format, Args), lager:info(Attr, Format, Args)).
-define(LOG_ERROR(Format, Args), lager:error(Format, Args)).
+-define(LOG_ERROR(Attr, Format, Args), lager:error(Attr, Format, Args)).
% Tree::term() is really a tree(), but we don't want to require R13B04 yet
-type branch() :: {Key::term(), Value::term(), Tree::term()}.
@@ -107,6 +107,9 @@ handle_call({add_task, TaskProps}, {From, _}, Server) ->
[] ->
true = ets:insert(?MODULE, {From, TaskProps}),
erlang:monitor(process, From),
+ Type = proplists:get_value(type, TaskProps),
+ ?LOG_DEBUG([{task, Type}], "Add task for ~p: ~p",
+ [From, TaskProps]),
{reply, ok, Server};
[_] ->
{reply, {add_task_error, already_registered}, Server}
@@ -123,7 +126,9 @@ handle_call(all, _, Server) ->
handle_cast({update_status, Pid, NewProps}, Server) ->
case ets:lookup(?MODULE, Pid) of
[{Pid, _CurProps}] ->
- ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]),
+ Type = proplists:get_value(type, NewProps),
+ ?LOG_DEBUG([{task, Type}], "New task status for ~p: ~p",
+ [Pid, NewProps]),
true = ets:insert(?MODULE, {Pid, NewProps});
_ ->
% Task finished/died in the meanwhile and we must have received
@@ -128,13 +128,13 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
%
case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
{ok, Pid} ->
- ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)",
+ ?LOG_REP("starting new replication `~s` at ~p (`~s` -> `~s`)",
[RepChildId, Pid, Source, Target]),
{ok, Pid};
{error, already_present} ->
case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
{ok, Pid} ->
- ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)",
+ ?LOG_REP("restarting replication `~s` at ~p (`~s` -> `~s`)",
[RepChildId, Pid, Source, Target]),
{ok, Pid};
{error, running} ->
@@ -154,7 +154,7 @@ async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
Error
end;
{error, {already_started, Pid}} ->
- ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
+ ?LOG_REP("replication `~s` already running at ~p (`~s` -> `~s`)",
[RepChildId, Pid, Source, Target]),
{ok, Pid};
{error, {Error, _}} ->
@@ -183,10 +183,10 @@ wait_for_result(RepId) ->
cancel_replication({BaseId, Extension}) ->
FullRepId = BaseId ++ Extension,
- ?LOG_INFO("Canceling replication `~s`...", [FullRepId]),
+ ?LOG_REP("Canceling replication `~s`...", [FullRepId]),
case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
ok ->
- ?LOG_INFO("Replication `~s` canceled.", [FullRepId]),
+ ?LOG_REP("Replication `~s` canceled.", [FullRepId]),
case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
ok ->
{ok, {cancelled, ?l2b(FullRepId)}};
@@ -196,7 +196,7 @@ cancel_replication({BaseId, Extension}) ->
Error
end;
Error ->
- ?LOG_ERROR("Error canceling replication `~s`: ~p", [FullRepId, Error]),
+ ?LOG_REP_ERROR("Error canceling replication `~s`: ~p", [FullRepId, Error]),
Error
end.
@@ -304,7 +304,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
% cancel_replication/1) and then start the replication again, but this is
% unfortunately not immune to race conditions.
- ?LOG_INFO("Replication `~p` is using:~n"
+ ?LOG_REP("Replication `~p` is using:~n"
"~c~p worker processes~n"
"~ca worker batch size of ~p~n"
"~c~p HTTP connections~n"
@@ -322,7 +322,7 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
end]),
- ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
+ ?LOG_REP_DEBUG("Worker pids are: ~p", [Workers]),
couch_replicator_manager:replication_started(Rep),
@@ -339,32 +339,32 @@ handle_info(shutdown, St) ->
{stop, shutdown, St};
handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- ?LOG_ERROR("Source database is down. Reason: ~p", [Why]),
+ ?LOG_REP_ERROR("Source database is down. Reason: ~p", [Why]),
{stop, source_db_down, St};
handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- ?LOG_ERROR("Target database is down. Reason: ~p", [Why]),
+ ?LOG_REP_ERROR("Target database is down. Reason: ~p", [Why]),
{stop, target_db_down, St};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
- ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
+ ?LOG_REP_ERROR("ChangesReader process died with reason: ~p", [Reason]),
{stop, changes_reader_died, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
- ?LOG_ERROR("ChangesManager process died with reason: ~p", [Reason]),
+ ?LOG_REP_ERROR("ChangesManager process died with reason: ~p", [Reason]),
{stop, changes_manager_died, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
- ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
+ ?LOG_REP_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
{stop, changes_queue_died, cancel_timer(State)};
handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
@@ -385,7 +385,7 @@ handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
false ->
{stop, {unknown_process_died, Pid, Reason}, State2};
true ->
- ?LOG_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]),
+ ?LOG_REP_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]),
{stop, {worker_died, Pid, Reason}, State2}
end.
@@ -415,7 +415,7 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
_ ->
NewThroughSeq0
end,
- ?LOG_DEBUG("Worker reported seq ~p, through seq was ~p, "
+ ?LOG_REP_DEBUG("Worker reported seq ~p, through seq was ~p, "
"new through seq is ~p, highest seq done was ~p, "
"new highest seq done is ~p~n"
"Seqs in progress were: ~p~nSeqs in progress are now: ~p",
@@ -478,7 +478,7 @@ terminate(Reason, State) ->
target_name = Target,
rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
} = State,
- ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
+ ?LOG_REP_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
[BaseId ++ Ext, Source, Target, to_binary(Reason)]),
terminate_cleanup(State),
couch_replicator_notifier:notify({error, RepId, Reason}),
@@ -512,7 +512,7 @@ start_timer(State) ->
{ok, Ref} ->
Ref;
Error ->
- ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]),
+ ?LOG_REP_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]),
nil
end.
@@ -639,7 +639,7 @@ read_changes(StartSeq, Db, ChangesQueue, Options) ->
% Previous CouchDB releases had a bug which allowed a doc
% with an empty ID to be inserted into databases. Such doc
% is impossible to GET.
- ?LOG_ERROR("Replicator: ignoring document with empty ID in "
+ ?LOG_REP_ERROR("Replicator: ignoring document with empty ID in "
"source database `~s` (_changes sequence ~p)",
[couch_replicator_api_wrap:db_uri(Db), Seq]);
_ ->
@@ -655,13 +655,13 @@ read_changes(StartSeq, Db, ChangesQueue, Options) ->
LastSeq = get(last_seq),
Db2 = case LastSeq of
StartSeq ->
- ?LOG_INFO("Retrying _changes request to source database ~s"
+ ?LOG_REP("Retrying _changes request to source database ~s"
" with since=~p in ~p seconds",
[couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
ok = timer:sleep(Db#httpdb.wait),
Db#httpdb{wait = 2 * Db#httpdb.wait};
_ ->
- ?LOG_INFO("Retrying _changes request to source database ~s"
+ ?LOG_REP("Retrying _changes request to source database ~s"
" with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
Db
end,
@@ -727,7 +727,7 @@ do_checkpoint(State) ->
{checkpoint_commit_failure,
<<"Failure on target commit: ", (to_binary(Reason))/binary>>};
{SrcInstanceStartTime, TgtInstanceStartTime} ->
- ?LOG_INFO("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
+ ?LOG_REP("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
[SourceName, TargetName, NewSeq]),
StartTime = ?l2b(ReplicationStartTime),
EndTime = ?l2b(httpd_util:rfc1123_date()),
@@ -880,30 +880,30 @@ compare_replication_logs(SrcDoc, TgtDoc) ->
false ->
SourceHistory = get_value(<<"history">>, RepRecProps, []),
TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
- ?LOG_INFO("Replication records differ. "
+ ?LOG_REP("Replication records differ. "
"Scanning histories to find a common ancestor.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ ?LOG_REP_DEBUG("Record on source:~p~nRecord on target:~p~n",
[RepRecProps, RepRecPropsTgt]),
compare_rep_history(SourceHistory, TargetHistory)
end.
compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
- ?LOG_INFO("no common ancestry -- performing full replication", []),
+ ?LOG_REP("no common ancestry -- performing full replication", []),
{?LOWEST_SEQ, []};
compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
SourceId = get_value(<<"session_id">>, S),
case has_session_id(SourceId, Target) of
true ->
RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
- ?LOG_INFO("found a common replication record with source_seq ~p",
+ ?LOG_REP("found a common replication record with source_seq ~p",
[RecordSeqNum]),
{RecordSeqNum, SourceRest};
false ->
TargetId = get_value(<<"session_id">>, T),
case has_session_id(TargetId, SourceRest) of
true ->
RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
- ?LOG_INFO("found a common replication record with source_seq ~p",
+ ?LOG_REP("found a common replication record with source_seq ~p",
[RecordSeqNum]),
{RecordSeqNum, TargetRest};
false ->
@@ -30,3 +30,10 @@
docs_written = 0,
doc_write_failures = 0
}).
+
+-define(LOG_REP(Format, Args), lager:info([{task, replication}],
+ Format, Args)).
+-define(LOG_REP_ERROR(Format, Args), lager:error([{task, replication}],
+ Format, Args)).
+-define(LOG_REP_DEBUG(Format, Args), lager:debug([{task, replication}],
+ Format, Args)).
@@ -16,7 +16,7 @@
-include_lib("couch_httpd/include/couch_httpd.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").
-include("couch_replicator_api_wrap.hrl").
-
+-include("couch_replicator.hrl").
-export([setup/1]).
-export([send_req/3]).
@@ -156,7 +156,7 @@ maybe_retry(Error, Worker, #httpdb{retries = Retries, wait = Wait} = HttpDb,
release_worker(Worker, HttpDb),
Method = string:to_upper(atom_to_list(get_value(method, Params, get))),
Url = couch_util:url_strip_password(full_url(HttpDb, Params)),
- ?LOG_INFO("Retrying ~s request to ~s in ~p seconds due to error ~s",
+ ?LOG_REP("Retrying ~s request to ~s in ~p seconds due to error ~s",
[Method, Url, Wait / 1000, error_cause(Error)]),
ok = timer:sleep(Wait),
Wait2 = erlang:min(Wait * 2, ?MAX_WAIT),
@@ -172,11 +172,11 @@ report_error(Worker, HttpDb, Params, Error) ->
do_report_error(Url, Method, {code, Code}) ->
- ?LOG_ERROR("Replicator, request ~s to ~p failed. The received "
+ ?LOG_REP_ERROR("Replicator, request ~s to ~p failed. The received "
"HTTP error code is ~p", [Method, Url, Code]);
do_report_error(FullUrl, Method, Error) ->
- ?LOG_ERROR("Replicator, request ~s to ~p failed due to error ~s",
+ ?LOG_REP_ERROR("Replicator, request ~s to ~p failed due to error ~s",
[Method, FullUrl, error_cause(Error)]).
Oops, something went wrong.

0 comments on commit 45c7cc5

Please sign in to comment.