Skip to content

Commit

Permalink
fit original idea of upload task into new framework
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Rehfeld committed Sep 8, 2012
1 parent 3488af8 commit 2e676c5
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 202 deletions.
29 changes: 16 additions & 13 deletions apps/glacier_proxy/src/gp_http.erl
Expand Up @@ -56,16 +56,18 @@ get(Path, Params, Req) -> not_found('GET', Path, Params, Req).


post([<<"vault">>, Vault], _Params, Req) ->
{ok, {Sha, Sha256}, Req2} = process_body(Req),
{ok, {Sha, TreeSha}, Req2} = upload_body(Req),

DummyReply = {
[{<<"sha1">>, Sha},
{<<"sha256">>, Sha256},
{<<"treeSha">>, TreeSha},
{<<"date">>, <<"Sun, 2 Sep 2012 12:00:00 GMT">>},
{<<"archiveId">>, <<"EXAMPLEArchiveId">>},
{<<"location">>, <<"/12345678/vaults/", Vault/binary, "/archives/EXAMPLEArchiveId">>}]
},

error_logger:info_msg("Will reply ~p~n", [DummyReply]),

{200,
[{<<"Content-Type">>, <<"application/json">>}],
jiffy:encode(DummyReply), Req2};
Expand All @@ -81,14 +83,15 @@ not_found(_Method, _Path, _Params, Req) ->
{404, [], <<"Not found">>, Req}.


%% @doc for testing chunked requests only, @todo move into some upload handler
process_body(Req) ->
Sha = gp_chksum:sha_init(),
Sha256 = gp_chksum:sha256_init(),
process_body(cowboy_http_req:stream_body(Req), {Sha, Sha256}).
process_body({ok, Data, Req}, {Sha, Sha256}) ->
error_logger:info_msg("Got body chunk with ~p bytes~n", [byte_size(Data)]),
process_body(cowboy_http_req:stream_body(Req),
{gp_chksum:sha_update(Sha, Data), gp_chksum:sha256_update(Sha256, Data)});
process_body({done, Req}, {Sha, Sha256}) ->
{ok, {gp_chksum:sha_final(Sha), gp_chksum:sha256_final(Sha256)}, Req}.
upload_body(Req) ->
Context = gp_upload:init(Req),
upload_body(cowboy_http_req:stream_body(Req), Context).

upload_body({ok, Data, Req}, Context) ->
error_logger:info_msg("g8_http: Got body chunk with ~p bytes~n", [byte_size(Data)]),
upload_body(cowboy_http_req:stream_body(Req), gp_upload:update(Data, Context));
upload_body({done, Req}, Context) ->
Context2 = gp_upload:final(Context),
Sha = gp_upload:sha(Context2),
TreeSha = gp_upload:tree_sha(Context2),
{ok, {Sha, TreeSha}, Req}.
116 changes: 116 additions & 0 deletions apps/glacier_proxy/src/gp_upload.erl
@@ -0,0 +1,116 @@
-module(gp_upload).

-include("glacier_proxy.hrl").

%% AWS Glacier allows a maximum of 10,000 parts per upload, so the
%% PART_SIZE will determine the biggest archive size that can be
%% uploaded by this service, e.g. 128 MB part size will allow an
%% archive size of up to ~1.2 TB.
-define(PART_SIZE, (128 * 1024 * 1024)). % 128 MB

%% AWS Glacier uses chunks of 1 MB to calculate the checksums on.
%% These checksums are reduced to the final tree checksum.
-define(CHUNK_SIZE, (1024 * 1024)). % 1 MB

-define(CONNECT_TIMEOUT, 1000).

-record(context, {req, parts, shactx, sha}).
-record(part, {start, len, shas, upload_state, status}).

%% API
-export([init/1, update/2, final/1]).
-export([sha/1, tree_sha/1]).


%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

init(Req) ->
#context{req = Req,
parts = [],
shactx = gp_chksum:sha_init()}.

update(Data, #context{parts = [], shactx = Sha} = Ctx) ->
error_logger:info_msg("Received ~p bytes of data~n", [byte_size(Data)]),
initiate_upload(),
Ctx#context{parts = initiate_part(Data, []),
shactx = gp_chksum:sha_update(Sha, Data)};
update(Data, #context{parts = Parts, shactx = Sha} = Ctx) ->
error_logger:info_msg("Received ~p bytes of data~n", [byte_size(Data)]),
Ctx#context{parts = do_upload(Data, Parts),
shactx = gp_chksum:sha_update(Sha, Data)}.


final(#context{shactx = Sha} = Ctx) ->
error_logger:info_msg("gs_uploader: completed successfully~n", []),
finalize_upload(),
Ctx#context{sha = gp_chksum:sha_final(Sha)}.

abort(#context{}) ->
error_logger:info_msg("gs_uploader: aborted~n", []),
abort_upload().

sha(#context{sha = Sha}) -> Sha.

%% @todo acutally compute the treesha
tree_sha(#context{} = Context) -> sha(Context).


%% ===================================================================
%% Internal functions
%% ===================================================================
initiate_upload() ->
%% @todo issue AWS Glacier Initiate Multipart Upload
ok.

do_upload(Data, Parts) ->
[Current | Done] = Parts,
DataSize = byte_size(Data),
LenToGo = ?PART_SIZE - Current#part.len,
if
DataSize >= LenToGo ->
{RemainingData, NewData} = split_data(Data, LenToGo),
initiate_part(NewData, [finalize_part(Current, RemainingData) | Done]);

DataSize < LenToGo ->
[upload_data(Current, Data) | Done]
end.

finalize_upload() ->
%% @todo issue AWS Glacier Complete Multipart Upload
ok.

abort_upload() ->
%% @todo issue AWS Glacier Abort Multipart Upload if started
ok.

initiate_part(Data, Done) when byte_size(Data) > ?PART_SIZE ->
{Data1, Data2} = split_data(Data, ?PART_SIZE),
initiate_part(Data2, initiate_part(Data1, Done));
initiate_part(Data, Done) when byte_size(Data) =:= ?PART_SIZE ->
[Current | Done] = initiate_part(<<>>, Done),
[finalize_part(Current, Data) | Done ];
initiate_part(Data, Done) ->
Pos = ?PART_SIZE * length(Done),
%% @todo issue AWS Glacier Upload Part, keep reference to UploadState
[upload_data(#part{start = Pos,
len = 0,
upload_state = tbd,
status = active}, Data) | Done].

upload_data(#part{len = Len, upload_state = _US} = Current, Data) ->
%% @todo send chunked data to socket (get UploadState from initiate_part)
error_logger:info_msg("Would upload ~p bytes of data (delaying...)~n", [byte_size(Data)]), timer:sleep(200),
Current#part{len = Len + byte_size(Data)}.

finalize_part(#part{upload_state = _US} = Current, Data) ->
Finished = #part{len = ?PART_SIZE} = upload_data(Current, Data),
%% @todo finalize HTTP request, process response
%% @todo if a part fails, it should be retried, we would need to
%% accumulate the whole part in memory for this...
Finished#part{status = done}.

split_data(Data, Len) ->
{binary_part(Data, 0, Len),
binary_part(Data, Len, byte_size(Data) - Len)}.
189 changes: 0 additions & 189 deletions apps/glacier_proxy/src/gp_uploader.erl

This file was deleted.

0 comments on commit 2e676c5

Please sign in to comment.