Skip to content

Commit

Permalink
fix: avoid uploading blocks too frequently and splitting large buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed May 28, 2024
1 parent 0b52ee7 commit 81964f1
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ fields(aggregation) ->
default => 1_000_000,
desc => ?DESC("aggregation_max_records")
}
)},
{min_block_size,
mk(
emqx_schema:bytesize(),
#{
default => <<"10mb">>,
importance => ?IMPORTANCE_HIDDEN,
required => true,
validator => fun block_size_validator/1
}
)}
];
fields(common_action_parameters) ->
Expand All @@ -156,7 +166,7 @@ fields(common_action_parameters) ->
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC("max_block_size"),
required => true,
validator => fun max_block_size_validator/1
validator => fun block_size_validator/1
}
)}
];
Expand Down Expand Up @@ -293,7 +303,7 @@ scunion(Field, Schemas, {value, Value}) ->
throw(#{field_name => Field, expected => maps:keys(Schemas)})
end.

max_block_size_validator(SizeLimit) ->
block_size_validator(SizeLimit) ->
case SizeLimit =< 4_000 * 1024 * 1024 of
true -> ok;
false -> {error, "must be less than 4000 MiB"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@
%% `emqx_template' API
-export([lookup/2]).

-ifdef(TEST).
-export([take_chunk/2]).
-endif.

%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -131,6 +127,7 @@
action := binary(),
blob := emqx_template:t(),
container := string(),
min_block_size := pos_integer(),
max_block_size := pos_integer(),
pool := connector_resource_id()
}
Expand All @@ -144,6 +141,8 @@
buffer_size := non_neg_integer(),
container := container(),
max_block_size := pos_integer(),
min_block_size := pos_integer(),
next_block := queue:queue(iolist()),
num_blocks := non_neg_integer(),
pool := pool_name(),
started := boolean()
Expand Down Expand Up @@ -348,6 +347,7 @@ init_transfer_state(Buffer, Opts) ->
blob := BlobTemplate,
container := Container,
max_block_size := MaxBlockSize,
min_block_size := MinBlockSize,
pool := Pool
}
} = Opts,
Expand All @@ -358,6 +358,8 @@ init_transfer_state(Buffer, Opts) ->
buffer_size => 0,
container => Container,
max_block_size => MaxBlockSize,
min_block_size => MinBlockSize,
next_block => queue:new(),
num_blocks => 0,
pool => Pool,
started => false
Expand All @@ -371,12 +373,25 @@ mk_blob_name_key(Buffer, ActionName, BlobTemplate) ->
process_append(IOData, TransferState0) ->
#{
buffer := Buffer,
buffer_size := BufferSize0
buffer_size := BufferSize0,
min_block_size := MinBlockSize,
next_block := NextBlock0
} = TransferState0,
TransferState0#{
buffer := [Buffer, IOData],
buffer_size := BufferSize0 + iolist_size(IOData)
}.
Size = iolist_size(IOData),
case Size + BufferSize0 >= MinBlockSize of
true ->
%% Block is ready to be written.
TransferState0#{
buffer := [],
buffer_size := 0,
next_block := queue:in([Buffer, IOData], NextBlock0)
};
false ->
TransferState0#{
buffer := [Buffer, IOData],
buffer_size := BufferSize0 + Size
}
end.

-spec process_write(transfer_state()) ->
{ok, transfer_state()} | {error, term()}.
Expand All @@ -396,25 +411,19 @@ process_write(TransferState0 = #{started := false}) ->
{error, Reason} ->
{error, Reason}
end;
process_write(TransferState = #{started := true, buffer_size := 0}) ->
{ok, TransferState};
process_write(TransferState0 = #{started := true}) ->
#{
buffer := Buffer,
buffer_size := BufferSize,
max_block_size := MaxBlockSize
next_block := NextBlock0
} = TransferState0,
case BufferSize > MaxBlockSize of
true ->
{IOData, NewBuffer} = take_chunk(Buffer, MaxBlockSize),
case queue:out(NextBlock0) of
{{value, Block}, NextBlock} ->
?tp(azure_blob_storage_will_write_chunk, #{}),
do_process_write(IOData, NewBuffer, TransferState0);
false ->
NewBuffer = [],
do_process_write(Buffer, NewBuffer, TransferState0)
do_process_write(Block, TransferState0#{next_block := NextBlock});
{empty, _} ->
{ok, TransferState0}
end.

do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) ->
do_process_write(IOData, TransferState0 = #{started := true}) ->
#{
blob := Blob,
container := Container,
Expand All @@ -423,12 +432,7 @@ do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) ->
} = TransferState0,
case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of
{ok, _} ->
BufferSize = iolist_size(NewBuffer),
TransferState = TransferState0#{
buffer := NewBuffer,
buffer_size := BufferSize,
num_blocks := NumBlocks + 1
},
TransferState = TransferState0#{num_blocks := NumBlocks + 1},
process_write(TransferState);
{error, Reason} ->
{error, Reason}
Expand All @@ -439,10 +443,21 @@ do_process_write(IOData, NewBuffer, TransferState0 = #{started := true}) ->
process_complete(TransferState) ->
#{
blob := Blob,
buffer := Buffer,
buffer_size := BufferSize,
container := Container,
num_blocks := NumBlocks,
num_blocks := NumBlocks0,
pool := Pool
} = TransferState,
%% Flush any left-over data
NumBlocks =
case BufferSize > 0 of
true ->
{ok, #{num_blocks := NumBlocks1}} = do_process_write(Buffer, TransferState),
NumBlocks1;
false ->
NumBlocks0
end,
BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)],
case put_block_list(Pool, Container, Blob, BlockRefs) of
{ok, _} ->
Expand Down Expand Up @@ -511,6 +526,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState)
aggregation := #{
container := ContainerOpts,
max_records := MaxRecords,
min_block_size := MinBlockSize,
time_interval := TimeInterval
},
container := ContainerName,
Expand All @@ -531,6 +547,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState)
blob => Blob,
container => ContainerName,
max_block_size => MaxBlockSize,
min_block_size => MinBlockSize,
pool => Pool
},
DeliveryOpts = #{
Expand Down Expand Up @@ -707,49 +724,6 @@ check_aggreg_upload_errors(AggregId) ->
check_container_accessible(Pool, Container) ->
list_blobs(Pool, Container).

-spec take_chunk(transfer_buffer(), pos_integer()) -> {_IOList :: iolist(), transfer_buffer()}.
take_chunk(Buffer, MaxBlockSize) ->
RemainingBytes = MaxBlockSize,
take_chunk(Buffer, RemainingBytes, _IOListAcc = []).

-spec take_chunk(transfer_buffer(), non_neg_integer(), IOList) ->
{IOList, transfer_buffer()}
when
IOList :: iolist().
take_chunk(RemainingBuffer = [], _RemainingBytes, IOListAcc) ->
{IOListAcc, RemainingBuffer};
take_chunk(RemainingBuffer, RemainingBytes, IOListAcc) when RemainingBytes =< 0 ->
{IOListAcc, [RemainingBuffer]};
take_chunk([Data0], RemainingBytes, IOListAcc) ->
case do_take_chunk([Data0], RemainingBytes) of
{done, Data} ->
{[IOListAcc, Data], _RemainingBuffer = []};
{more, {Data, Rest}} ->
{[IOListAcc, Data], [Rest]}
end;
take_chunk([Data0, Rest0], RemainingBytes0, IOListAcc) ->
case do_take_chunk(Data0, RemainingBytes0) of
{done, Data} ->
RemainingBytes = RemainingBytes0 - iolist_size(Data),
take_chunk([Rest0], RemainingBytes, [IOListAcc, Data]);
{more, {Data, Rest}} ->
{[IOListAcc, Data], [Rest, Rest0]}
end.

do_take_chunk(Data, ChunkSize) ->
case iolist_size(Data) =< ChunkSize of
true ->
{done, Data};
false ->
BinData0 = iolist_to_binary(Data),
{more, split_binary(BinData0, ChunkSize)}
end.

%% ensure_transfer_buffer_shape([_, _] = Buffer) ->
%% Buffer;
%% ensure_transfer_buffer_shape(Data) ->
%% [Data].

block_id(N) ->
NumDigits = 32,
list_to_binary(string:pad(integer_to_list(N), NumDigits, leading, $0)).
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,9 @@ t_aggreg_pending_upload_restart(Config) ->
#{
<<"parameters">> =>
#{
<<"max_block_size">> => <<"1024B">>
<<"aggregation">> => #{
<<"min_block_size">> => <<"1024B">>
}
}
}
)
Expand Down

This file was deleted.

0 comments on commit 81964f1

Please sign in to comment.