Skip to content

Commit

Permalink
Allow for S3 Ingress records with no bytes transferred
Browse files Browse the repository at this point in the history
  • Loading branch information
mobileoverlord committed Jul 1, 2019
1 parent 5562c48 commit 5278aca
Showing 1 changed file with 26 additions and 14 deletions.
Expand Up @@ -18,18 +18,23 @@ defmodule NervesHubWebCore.Firmwares.Transfer.S3Ingress do
S3.list_objects(bucket)
|> ExAws.stream!()
|> Enum.each(fn %{key: object_key} ->
Logger.info(fn -> "Processing: #{inspect(object_key)}" end)

response =
S3.get_object(bucket, object_key)
|> ExAws.request()

case response do
{:ok, %{body: log}} ->
Logger.info(fn -> "Decoding log: #{inspect(object_key)}" end)

errors =
decode_log(log)
|> List.flatten()
|> Enum.reduce([], fn params, errors ->
case Firmwares.create_firmware_transfer(params) do
{:ok, _transfer} ->
Logger.info(fn -> "Inserting record: #{inspect(params)}" end)
errors

{:error, error} ->
Expand All @@ -39,6 +44,7 @@ defmodule NervesHubWebCore.Firmwares.Transfer.S3Ingress do
end)

if errors == [] do
Logger.info(fn -> "Cleaning up: #{inspect(object_key)}" end)
S3.delete_object(bucket, object_key) |> ExAws.request()
else
Logger.error(fn -> "Error inserting transfers from object: #{inspect(object_key)}" end)
Expand Down Expand Up @@ -87,20 +93,26 @@ defmodule NervesHubWebCore.Firmwares.Transfer.S3Ingress do
Map.get(record, "key")
|> decode_key()

remote_ip = Map.get(record, "remote_ip")
timestamp = Map.get(record, "time") |> decode_time()
bytes_sent = Map.get(record, "bytes_sent") |> String.to_integer()
bytes_total = Map.get(record, "object_size") |> String.to_integer()

{:ok,
%{
org_id: org_id,
firmware_uuid: firmware_uuid,
remote_ip: remote_ip,
bytes_sent: bytes_sent,
bytes_total: bytes_total,
timestamp: timestamp
}}
with {:ok, remote_ip} <- Map.fetch(record, "remote_ip"),
{:ok, timestamp} <- Map.fetch(record, "time"),
timestamp <- decode_time(timestamp),
{:ok, bytes_sent} <- Map.fetch(record, "bytes_sent"),
{bytes_sent, _} <- Integer.parse(bytes_sent),
{:ok, bytes_total} <- Map.fetch(record, "object_size"),
{bytes_total, _} <- Integer.parse(bytes_total) do
{:ok,
%{
org_id: org_id,
firmware_uuid: firmware_uuid,
remote_ip: remote_ip,
bytes_sent: bytes_sent,
bytes_total: bytes_total,
timestamp: timestamp
}}
else
_ ->
{:error, :invalid_transfer_record}
end

_ ->
{:error, :invalid_transfer_record}
Expand Down

0 comments on commit 5278aca

Please sign in to comment.