Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cria worker para converter arquivos PDF para TXT #119

Merged
merged 10 commits into from
Jul 18, 2023
9 changes: 7 additions & 2 deletions apps/cotacoes_etl/lib/cotacoes_etl/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ defmodule CotacoesETL.Application do
use Application

alias CotacoesETL.Workers.Pesagro.BoletinsFetcher
alias CotacoesETL.Workers.Pesagro.CotacaoConverter

@impl true
def start(_, _) do
children =
if config_env() != :test do
[BoletinsFetcher, {Finch, name: PescarteHTTPClient}]
if config_env() != :test or should_fetch_pesagro_cotacoes?() do
[BoletinsFetcher, CotacaoConverter, {Finch, name: PescarteHTTPClient}]
else
[{Finch, name: PescarteHTTPClient}]
end
Expand All @@ -19,4 +20,8 @@ defmodule CotacoesETL.Application do
defp config_env do
Application.get_env(:cotacoes_etl, :config_env)
end

defp should_fetch_pesagro_cotacoes? do
Application.get_env(:cotacoes_etl, :fetch_pesagro_cotacoes, false)
end
end
12 changes: 12 additions & 0 deletions apps/cotacoes_etl/lib/cotacoes_etl/handlers.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule CotacoesETL.Handlers do
alias CotacoesETL.Handlers.PesagroHandler
alias CotacoesETL.Handlers.ZamzarHandler

def pesagro_handler do
Application.get_env(:cotacoes_etl, :pesagro_handler, PesagroHandler)
end

def zamzar_handler do
Application.get_env(:cotacoes_etl, :zamzar_handler, ZamzarHandler)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule CotacoesETL.Handlers.IManagePesagroHandler do
alias Cotacoes.Models.Cotacao

@callback download_boletim_from_pesagro!(Path.t(), Cotacao.t()) :: Path.t()
@callback extract_boletins_zip!(Path.t(), Path.t()) :: Path.t()
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
defmodule CotacoesETL.Handlers.IManageZamzarHandler do
alias CotacoesETL.Schemas.Zamzar.File, as: FileEntry
alias CotacoesETL.Schemas.Zamzar.Job

@callback download_pesagro_txt!(Job.t(), Path.t()) :: FileEntry.t()
@callback upload_pesagro_pdf!(Path.t(), integer) :: Job.t()
end
44 changes: 44 additions & 0 deletions apps/cotacoes_etl/lib/cotacoes_etl/handlers/pesagro_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule CotacoesETL.Handlers.PesagroHandler do
import CotacoesETL.Integrations

alias Cotacoes.Handlers.CotacaoHandler
alias CotacoesETL.Handlers.IManagePesagroHandler

require Logger

@behaviour IManagePesagroHandler

@impl true
def download_boletim_from_pesagro!(storage_path, cotacao) do
content = pesagro_api().download_file!(cotacao.link)
base_name = CotacaoHandler.get_cotacao_file_base_name(cotacao)
file_path = storage_path <> base_name
File.write!(file_path, content)
{:ok, _cotacao} = CotacaoHandler.set_cotacao_downloaded(cotacao)

file_path
end

@impl true
def extract_boletins_zip!(zip_path, storage_path) do
{:ok, unzip} =
zip_path
|> Unzip.LocalFile.open()
|> Unzip.new()

for entry <- Unzip.list_entries(unzip) do
path = storage_path <> entry.file_name
Logger.info("[#{__MODULE__}] => Extraindo arquivo #{path}")

file_binary =
unzip
|> Unzip.file_stream!(entry.file_name)
|> Enum.into([])
|> IO.iodata_to_binary()

:ok = File.write(path, file_binary)

path
end
end
end
30 changes: 30 additions & 0 deletions apps/cotacoes_etl/lib/cotacoes_etl/handlers/zamzar_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule CotacoesETL.Handlers.ZamzarHandler do
import CotacoesETL.Integrations, only: [zamzar_api: 0]

alias CotacoesETL.Handlers.IManageZamzarHandler
alias CotacoesETL.Schemas.Zamzar.Job
alias CotacoesETL.Workers.Pesagro.CotacaoConverter

require Logger

@behaviour IManageZamzarHandler

@impl true
def download_pesagro_txt!(%Job{} = job, storage_path) do
txt_file = List.first(job.target_files)
path = storage_path <> txt_file.name
file = zamzar_api().download_converted_file!(txt_file.id, path)
Logger.info("[#{__MODULE__}] ==> Arquivo #{inspect(file.name)} baixado da Zamzar")

file
end

@impl true
def upload_pesagro_pdf!(pdf_path, time_offset) do
job = zamzar_api().start_job!(pdf_path, "txt")
Logger.info("[#{__MODULE__}] ==> Upload do PDF #{pdf_path} completo em Zamzar")
CotacaoConverter.trigger_txt_download(job, time_offset)

job
end
end
5 changes: 5 additions & 0 deletions apps/cotacoes_etl/lib/cotacoes_etl/logic/zamzar_logic.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule CotacoesETL.Logic.ZamzarLogic do
alias CotacoesETL.Schemas.Zamzar.Job

def job_is_successful?(%Job{} = job), do: job.status == :successful
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
defmodule CotacoesETL.Workers.Pesagro.CotacaoConverter do
@moduledoc """
Este worker e responsável por listar todas as `cotacoes`
presentes no banco de dados, que ainda não foram baixadas.
Com isso, esse worker vai realizar todos os uploads e
conversões de cada arquivo PDF para TXT. E caso a cotação
seja um ZIP, irá extraí-lo e realizar o passo anterior!

Ao final desse fluxo, esse worker vai agendar a ingestão
dessas cotações por um outro (e último) worker `CotacaoIngester`.
"""

use GenServer

import CotacoesETL.Handlers
import CotacoesETL.Integrations

alias Cotacoes.Handlers.CotacaoHandler
alias CotacoesETL.Logic.ZamzarLogic
alias CotacoesETL.Schemas.Zamzar.Job

require Logger

@storage_path "/tmp/peapescarte/cotacoes/pesagro/"

def start_link(_) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

def trigger_cotacoes_convertion do
GenServer.cast(__MODULE__, :convert)
end

def trigger_pdf_upload do
GenServer.cast(__MODULE__, :upload_pdf)
end

def trigger_zip_extraction do
GenServer.cast(__MODULE__, :extract_zip)
end

@spec trigger_txt_download(Job.t(), integer) :: :ok
def trigger_txt_download(job, time_offset) do
:timer.apply_after(second_to_ms(time_offset), GenServer, :cast, [
__MODULE__,
{:download_txt, job}
])

:ok
end

@impl true
def init(_) do
unless File.exists?(@storage_path) do
File.mkdir_p!(@storage_path)
end

{:ok, nil}
end

@impl true
def handle_cast(:convert, _state) do
Logger.info(
"[#{__MODULE__}] ==> Iniciando a conversão pdf->txt de cotações pendentes da Pesagro"
)

cotacoes_to_upload = CotacaoHandler.find_cotacoes_not_downloaded()

Logger.info(
"#{__MODULE__} ==> #{length(cotacoes_to_upload)} cotações para serem convertidas da Pesagro"
)

tasks =
for cotacao <- cotacoes_to_upload do
Process.sleep(second_to_ms(1))

Task.async(fn ->
file_path = pesagro_handler().download_boletim_from_pesagro!(@storage_path, cotacao)
Logger.info("[#{__MODULE__}] ==> Arquivo #{file_path} baixado com sucesso da Pesagro")
file_path
end)
end

files_path = Task.await_many(tasks)

Logger.info("[#{__MODULE__}] ==> #{length(files_path)} arquivos foram baixados da Pesagro")

if Enum.any?(files_path, &String.ends_with?(&1, "zip")) do
trigger_zip_extraction()
else
trigger_pdf_upload()
end

{:noreply, files_path}
end

def handle_cast(:extract_zip, files_path) do
pdfs =
files_path
|> Enum.filter(&String.ends_with?(&1, "zip"))
|> Enum.map(fn zip_path ->
Task.async(fn ->
pesagro_handler().extract_boletins_zip!(zip_path, @storage_path)
end)
end)
|> Task.await_many()
|> List.flatten()

Logger.info("[#{__MODULE__}] ==> #{length(pdfs)} PDFs extraídos de arquivos ZIP da Pesagro")

trigger_pdf_upload()

{:noreply,
files_path
|> Enum.reject(&String.ends_with?(&1, "zip"))
|> Kernel.++(pdfs)}
end

def handle_cast(:upload_pdf, pdfs_paths) do
for {pdf, idx} <- Enum.with_index(pdfs_paths) do
Logger.info(
"[#{__MODULE__}] ==> Fazendo upload do PDF/Cotação #{pdf} para conversão para txt na Zamzar"
)

:timer.apply_after(second_to_ms(idx), zamzar_handler(), :upload_pesagro_pdf!, [pdf, idx + 2])
end

{:noreply, []}
end

def handle_cast({:download_txt, %Job{} = job}, _state) do
job = zamzar_api().retrieve_job!(job.id)

if ZamzarLogic.job_is_successful?(job) do
zamzar_handler().download_pesagro_txt!(job, @storage_path)

{:noreply, []}
else
trigger_txt_download(job, 2)

{:noreply, []}
end
end

@impl true
def handle_info(:schedule_convertion, state) do
GenServer.cast(__MODULE__, :convert)
{:noreply, state}
end

defp second_to_ms(s) do
s * 100 * 10
zoedsoupe marked this conversation as resolved.
Show resolved Hide resolved
end
end
3 changes: 2 additions & 1 deletion apps/cotacoes_etl/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ defmodule CotacoesETL.MixProject do
{:explorer, "~> 0.5.0"},
{:unzip, "~> 0.8"},
{:mox, "~> 1.0", only: :test},
{:cotacoes, in_umbrella: true}
{:cotacoes, in_umbrella: true},
{:database, in_umbrella: true}
]
end
end
16 changes: 16 additions & 0 deletions apps/cotacoes_etl/test/cotacoes_etl/logic/zamzar_logic_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule CotacoesETL.Logic.ZamzarLogicTest do
use ExUnit.Case, async: true

alias CotacoesETL.Logic.ZamzarLogic
alias CotacoesETL.Schemas.Zamzar.Job

describe "job_is_successful?/1" do
test "quando um job tem sucesso, deve retornar true" do
assert ZamzarLogic.job_is_successful?(%Job{status: :successful})
end

test "quando um job não tem sucesso, deve retornar false" do
refute ZamzarLogic.job_is_successful?(%Job{status: :converting})
end
end
end