Skip to content

Commit

Permalink
Expose configuration of Compactio Percentage targets
Browse files Browse the repository at this point in the history
I think these should be set differently, so make them configurable.
  • Loading branch information
martinsumner committed Jul 23, 2018
1 parent 92bd251 commit 66a7592
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 38 deletions.
6 changes: 5 additions & 1 deletion include/leveled.hrl
Expand Up @@ -58,7 +58,9 @@
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
compress_on_receipt = false :: boolean(),
max_run_length}).
max_run_length,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined}).

-record(penciller_options,
{root_path :: string() | undefined,
Expand All @@ -77,6 +79,8 @@
cdb_options = #cdb_options{} :: #cdb_options{},
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
reload_strategy = [] :: list()}).

-record(recent_aae, {filter :: whitelist|blacklist,
Expand Down
28 changes: 26 additions & 2 deletions priv/leveled.schema
Expand Up @@ -89,11 +89,35 @@
%% In a single compaction run, what is the maximum number of consecutive files
%% which may be compacted.
{mapping, "leveled.max_run_length", "leveled.max_run_length", [
{default, 8},
{datatype, integer},
{default, 6},
{datatype, integer}
]}.

%% @doc Target Percentage for Max Run
%% What is the target score for a maximum run of files, to qualify for
%% compaction. If less than this percentage would be retained after compaction
%% then it is a candidate (e.g. in default case if 25% of space would be
%% recovered)
{mapping, "leveled.maxrunlength_compactionpercentage", "leveled.maxrunlength_compactionpercentage", [
{default, 75.0},
{datatype, float},
hidden
]}.

%% @doc Target Percentage for Single File
%% What is the target score for a run of a single file, to qualify for
%% compaction. If less than this percentage would be retained after compaction
%% then it is a candidate (e.g. in default case if 50% of space would be
%% recovered)
{mapping, "leveled.singlefile_compactionpercentage", "leveled.singlefile_compactionpercentage", [
{default, 50.0},
{datatype, float},
hidden
]}.







21 changes: 21 additions & 0 deletions src/leveled_bookie.erl
Expand Up @@ -99,6 +99,8 @@
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
{singlefile_compactionpercentage, 50.0},
{maxrunlength_compactionpercentage, 70.0},
{reload_strategy, []},
{max_pencillercachesize, undefined},
{compression_method, ?COMPRESSION_METHOD},
Expand Down Expand Up @@ -225,6 +227,15 @@
% The maximum number of consecutive files that can be compacted in
% one compaction operation.
% Defaults to leveled_iclerk:?MAX_COMPACTION_RUN (if undefined)
{singlefile_compactionpercentage, float()} |
% What is the percentage of space to be recovered from compacting
% a single file, before that file can be a compaction candidate in
% a compaction run of length 1
{maxrunlength_compactionpercentage, float()} |
% What is the percentage of space to be recovered from compacting
% a run of max_run_length, before that run can be a compaction
% candidate. For runs between 1 and max_run_length, a
% proportionate score is calculated
{reload_strategy, list()} |
% The reload_strategy is exposed as an option as currently no firm
% decision has been made about how recovery from failure should
Expand Down Expand Up @@ -1008,6 +1019,14 @@ set_options(Opts) ->
ok = filelib:ensure_dir(JournalFP),
ok = filelib:ensure_dir(LedgerFP),

SFL_CompPerc =
proplists:get_value(singlefile_compactionpercentage, Opts),
MRL_CompPerc =
proplists:get_value(maxrunlength_compactionpercentage, Opts),
true = MRL_CompPerc >= SFL_CompPerc,
true = 100.0 >= MRL_CompPerc,
true = SFL_CompPerc >= 0.0,

CompressionMethod = proplists:get_value(compression_method, Opts),
CompressOnReceipt =
case proplists:get_value(compression_point, Opts) of
Expand All @@ -1023,6 +1042,8 @@ set_options(Opts) ->
{#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy,
max_run_length = proplists:get_value(max_run_length, Opts),
singlefile_compactionperc = SFL_CompPerc,
maxrunlength_compactionperc = MRL_CompPerc,
waste_retention_period = WRP,
compression_method = CompressionMethod,
compress_on_receipt = CompressOnReceipt,
Expand Down
68 changes: 39 additions & 29 deletions src/leveled_iclerk.erl
Expand Up @@ -95,21 +95,21 @@
-define(SAMPLE_SIZE, 100).
-define(BATCH_SIZE, 32).
-define(BATCHES_TO_CHECK, 8).
%% How many consecutive files to compact in one run
-define(MAX_COMPACTION_RUN, 8).
%% Sliding scale to allow preference of longer runs up to maximum
-define(SINGLEFILE_COMPACTION_TARGET, 40.0).
-define(MAXRUN_COMPACTION_TARGET, 70.0).
-define(CRC_SIZE, 4).
-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])).
-define(INTERVALS_PER_HOUR, 4).
-define(MAX_COMPACTION_RUN, 8).
-define(SINGLEFILE_COMPACTION_TARGET, 50.0).
-define(MAXRUNLENGTH_COMPACTION_TARGET, 75.0).

-record(state, {inker :: pid() | undefined,
max_run_length :: integer() | undefined,
cdb_options,
waste_retention_period :: integer() | undefined,
waste_path :: string() | undefined,
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(),
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native}).

-record(candidate, {low_sqn :: integer() | undefined,
Expand Down Expand Up @@ -183,13 +183,30 @@ init([IClerkOpts]) ->
MRL0 ->
MRL0
end,


SFL_CompPerc =
case IClerkOpts#iclerk_options.singlefile_compactionperc of
undefined ->
?SINGLEFILE_COMPACTION_TARGET;
SFLCP when is_float(SFLCP) ->
SFLCP
end,
MRL_CompPerc =
case IClerkOpts#iclerk_options.maxrunlength_compactionperc of
undefined ->
?MAXRUNLENGTH_COMPACTION_TARGET;
MRLCP when is_float(MRLCP) ->
MRLCP
end,

{ok, #state{max_run_length = MRL,
inker = IClerkOpts#iclerk_options.inker,
cdb_options = CDBopts,
reload_strategy = ReloadStrategy,
waste_path = WP,
waste_retention_period = WRP,
singlefile_compactionperc = SFL_CompPerc,
maxrunlength_compactionperc = MRL_CompPerc,
compression_method =
IClerkOpts#iclerk_options.compression_method}}.

Expand All @@ -208,11 +225,15 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
CDBopts = State#state.cdb_options,

Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN),
BestRun0 = assess_candidates(Candidates, MaxRunLength),
case score_run(BestRun0, MaxRunLength) of
ScoreParams =
{MaxRunLength,
State#state.maxrunlength_compactionperc,
State#state.singlefile_compactionperc},
BestRun0 = assess_candidates(Candidates, ScoreParams),
case score_run(BestRun0, ScoreParams) of
Score when Score > 0.0 ->
BestRun1 = sort_run(BestRun0),
print_compaction_run(BestRun1, MaxRunLength),
print_compaction_run(BestRun1, ScoreParams),
ManifestSlice = compact_files(BestRun1,
CDBopts,
FilterFun,
Expand Down Expand Up @@ -436,13 +457,6 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List).


assess_candidates(AllCandidates, MaxRunLength) when is_integer(MaxRunLength) ->
% This will take the defaults for other params.
% Unit tests should pass tuple as params including tested defaults
assess_candidates(AllCandidates,
{MaxRunLength,
?MAXRUN_COMPACTION_TARGET,
?SINGLEFILE_COMPACTION_TARGET});
assess_candidates(AllCandidates, Params) ->
NaiveBestRun = assess_candidates(AllCandidates, Params, [], []),
MaxRunLength = element(1, Params),
Expand Down Expand Up @@ -492,11 +506,7 @@ choose_best_assessment(RunToAssess, BestRun, Params) ->
end
end.

score_run(Run, MaxRunLength) when is_integer(MaxRunLength) ->
Params = {MaxRunLength,
?MAXRUN_COMPACTION_TARGET,
?SINGLEFILE_COMPACTION_TARGET},
score_run(Run, Params);

score_run([], _Params) ->
0.0;
score_run(Run, {MaxRunLength, MR_CT, SF_CT}) ->
Expand All @@ -515,9 +525,9 @@ score_run(Run, {MaxRunLength, MR_CT, SF_CT}) ->
Target - RunTotal / length(Run).


print_compaction_run(BestRun, MaxRunLength) ->
print_compaction_run(BestRun, ScoreParams) ->
leveled_log:log("IC005", [length(BestRun),
score_run(BestRun, MaxRunLength)]),
score_run(BestRun, ScoreParams)]),
lists:foreach(fun(File) ->
leveled_log:log("IC006", [File#candidate.filename])
end,
Expand Down Expand Up @@ -716,19 +726,19 @@ simple_score_test() ->
#candidate{compaction_perc = 75.0},
#candidate{compaction_perc = 76.0},
#candidate{compaction_perc = 70.0}],
?assertMatch(-4.0, score_run(Run1, 4)),
?assertMatch(-4.0, score_run(Run1, {4, 70.0, 40.0})),
Run2 = [#candidate{compaction_perc = 75.0}],
?assertMatch(-35.0, score_run(Run2, 4)),
?assertMatch(0.0, score_run([], 4)),
?assertMatch(-35.0, score_run(Run2, {4, 70.0, 40.0})),
?assertMatch(0.0, score_run([], {4, 40.0, 70.0})),
Run3 = [#candidate{compaction_perc = 100.0}],
?assertMatch(-60.0, score_run(Run3, 4)).
?assertMatch(-60.0, score_run(Run3, {4, 70.0, 40.0})).

score_compare_test() ->
Run1 = [#candidate{compaction_perc = 55.0},
#candidate{compaction_perc = 55.0},
#candidate{compaction_perc = 56.0},
#candidate{compaction_perc = 50.0}],
?assertMatch(16.0, score_run(Run1, 4)),
?assertMatch(16.0, score_run(Run1, {4, 70.0, 40.0})),
Run2 = [#candidate{compaction_perc = 55.0}],
?assertMatch(Run1,
choose_best_assessment(Run1,
Expand Down Expand Up @@ -758,7 +768,7 @@ find_bestrun_test() ->
%% Tests dependent on these defaults
%% -define(MAX_COMPACTION_RUN, 4).
%% -define(SINGLEFILE_COMPACTION_TARGET, 40.0).
%% -define(MAXRUN_COMPACTION_TARGET, 60.0).
%% -define(MAXRUNLENGTH_COMPACTION_TARGET, 60.0).
%% Tested first with blocks significant as no back-tracking
Params = {4, 60.0, 40.0},
Block1 = [#candidate{compaction_perc = 55.0},
Expand Down
20 changes: 14 additions & 6 deletions src/leveled_inker.erl
Expand Up @@ -601,14 +601,20 @@ start_from_file(InkOpts) ->
WRP = InkOpts#inker_options.waste_retention_period,
ReloadStrategy = InkOpts#inker_options.reload_strategy,
MRL = InkOpts#inker_options.max_run_length,
SFL_CompactPerc = InkOpts#inker_options.singlefile_compactionperc,
MRL_CompactPerc = InkOpts#inker_options.maxrunlength_compactionperc,
PressMethod = InkOpts#inker_options.compression_method,
PressOnReceipt = InkOpts#inker_options.compress_on_receipt,
IClerkOpts = #iclerk_options{inker = self(),
cdb_options=IClerkCDBOpts,
waste_retention_period = WRP,
reload_strategy = ReloadStrategy,
compression_method = PressMethod,
max_run_length = MRL},
IClerkOpts =
#iclerk_options{inker = self(),
cdb_options=IClerkCDBOpts,
waste_retention_period = WRP,
reload_strategy = ReloadStrategy,
compression_method = PressMethod,
max_run_length = MRL,
singlefile_compactionperc = SFL_CompactPerc,
maxrunlength_compactionperc = MRL_CompactPerc
},

{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),

Expand Down Expand Up @@ -1175,6 +1181,8 @@ compact_journal_testto(WRP, ExpectedFiles) ->
cdb_options=CDBopts,
reload_strategy=RStrategy,
waste_retention_period=WRP,
singlefile_compactionperc=40.0,
maxrunlength_compactionperc=70.0,
compression_method=native,
compress_on_receipt=false},

Expand Down

0 comments on commit 66a7592

Please sign in to comment.