<a href="https://colab.research.google.com/github/mvslopes/Fluxo-de-Dados-SDBE/blob/main/03_Construindo_C%C3%B3digos_ETL_com_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##**SPRINT 02- OBJETIVOS**

O principal objetivo da Sprint 02 deste projeto é avançar na implementação do Data Warehouse, focando especificamente no desenvolvimento do processo de ETL (Extração, Transformação e Carregamento).

Para atingir esse objetivo, a equipe se concentrará em construir os códigos em **Python e configurar o Apache Airflow** para automatizar e gerenciar as etapas do ETL.

Os principais passos desta sprint incluem:

**Desenvolvimento das Rotinas de Extração:** Será criado o código Python responsável por extrair os dados das fontes de origem. Isso garantirá que os dados sejam coletados de maneira eficiente e confiável.

**Transformação dos Dados:** Ocrrerá as transformações necessárias nos dados extraídos. Isso pode incluir limpeza, formatação, agregação e enriquecimento dos dados para prepará-los para análise.

**Carregamento no Data Warehouse:** Após a transformação, os dados serão carregados no Data Warehouse, onde serão armazenados de forma estruturada e organizada. **O Apache Airflow** será configurado para agendar e executar essas tarefas de carregamento de maneira automatizada e programada.


Ao final desta sprint, espera-se que o processo de ETL esteja completamente implementado e funcional, permitindo que os dados sejam coletados, transformados e carregados no Data Warehouse de forma automatizada e confiável. Isso será essencial para a construção de um ambiente de análise de dados robusto e eficaz.

##**01- Construindo os códigos ETL com a Linguagem Python**

**Objetivo:**\
Após a modelagem do Data Warehouse, a criação das tabelas de dimensões e da tabela Fato, foi construído o processo de ETL - Extração, Transformação e Carregamento com a linguagem Python e o Apache Airflow.


###**1.1- Primeiro Bloco de Código " O processo de Imports"**

In [None]:
# Primeiro Bloco de comandos os "Imports"
import csv
import airflow
import time
import pandas as pd
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago


**Comentando os código do Pacote imports**

* **| import "CSV"** | : Importa o módulo "csv" em Python. Nossa fonte de dados é um arquivo CSV. Portanto, foi necessário manipular esses arquivos em algum momento.

* **| import "airflow" |** : importa o pacote "airflow" em Python pois foi necessário construir toda a DAG (Directed Acyclic Graph, ou Grafo Direcionado Acíclico), que será lida pelo Apache Airflow.

* **| import "time" |** : Importa o módulo "time" em Python. fornece funcionalidades relacionadas ao tempo e à temporização.

* **| import pandas as pd |** : Importa o módulo "pandas" em Python que foi renomeado para "pd" para facilitar referências posteriores. O "pandas" é uma biblioteca amplamente usada para manipulação e análise de dados. Forneceu estruturas de dados e funções para trabalhar com dados tabulares.

**Em seguida, temos os imports específicos do Apache Airflow:**

* **| from datetime import datetime | e | from datetime import timedelta | :**\
 Importa as classes **datetime e timedelta** do módulo **datetime** para lidar com datas e horários.

* **| from airflow import DAG | :** Importa a classe **DAG** do pacote **airflow**, que é fundamental para definir e configurar fluxos de trabalho no Apache Airflow.

* **| from airflow.operators.python_operator import PythonOperator | :** Importa a classe **PythonOperator** do **pacote airflow.operators.python_operator**, que permite executar tarefas Python como parte de uma DAG.

* **| from airflow.operators.postgres_operator import PostgresOperator | :** Importa a classe **PostgresOperator do pacote airflow.operators.postgres_operator**, que é usada para executar tarefas relacionadas ao PostgreSQL em uma DAG.

* **| from airflow.utils.dates import days_ago |:** Importa a função **days_ago** do pacote **airflow.utils.dates**, que é útil para calcular datas com base na contagem regressiva de dias a partir da data atual.

 ---

###**1.2- Segundo Bloco de Código " Criação da DAG - Directed Acyclic Graphs"**
###**Tópico Argumentos**

In [None]:
# Argumentos
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


**Comentando o código do Argumentos**

**Argumentos:** representa um direcionário de argumentos

* **| 'owner': 'airflow' |** : Indica que o próprietario ou responsável pela DAG é o Airflow.

* **| start_date': datetime(2023, 1, 1) |** : Indica que a data e a hora que a DAG começará a ser agendada e executada. A DAG começará a ser executada a partir do primeiro dia de janeiro de 2023. A partir desse ponto, a DAG será agendada de acordo com o cronograma especificado, que pode ser definido usando as outras configurações, como **schedule_interval.**

 A data  foi retroativa **datetime(2023, 1, 1)**, pois quando criamos uma GAG, nos não conseguimos mais modificar esta data, dessa forma é sempre bom colocarmos uma data retroativa, para que comecemos a execuação no momento que começarmos a DAG.

* **| 'depends_on_past': False' |** : Indica que a execução atual da DAG não depende do sucesso da execução anterior. Ou seja, cada execução da DAG ocorrerá independentemente do resultado das execuções anteriores.

* **| 'retries': 1 |** : Indica que em caso de falha na execução da tarefa, haverá 01 tentativa de retomar a tarefa. Se a tarefa falhar novamente após essa tentativa, não haverá mais retentativas, e a execução será considerada como falha permanente.

* **| 'retry_delay': timedelta(minutes=5) |** : Indica que o intervalo de tempo que deve ser aguardado antes de tentar novamente uma tarefa que falhou. Neste caso, após uma falha, a tarefa será reagendada para uma nova tentativa após 5 minutos. Isso permite um atraso entre as tentativas para evitar sobrecarregar o sistema em caso de falhas frequentes.

---




###**Tópico criação da DAG**

In [None]:
# Criar a DAG

# https://crontab.guru/
dag_log_solutions = DAG(dag_id = "logsol",
                   default_args = default_args,
                   schedule_interval = '0 0 * * *',
                   dagrun_timeout = timedelta(minutes = 60),
                   description = 'Job ETL de Carga no DW com Airflow',
                   start_date = airflow.utils.dates.days_ago(1)
)


**Comentando o código do Criar DAG**


* **|dag_log_solutions = DAG(dag_id = "logsol"|** : Indica que a DAG está sendo nomeada como "dag_log_solutions_dsa" com o ID "logsol".

* **| default_args = default_args |** : Indica a Definição doo argumento e o conteúdo desse argumento que foi a **lista de agumentos definida na etapa anterior.**

* **| schedule_interval = '0 0 * * *' |** : Indica a Definição do intervalo de agendamento: No caso de "schedule_interval = '0 0 * * *'", isso significa o seguinte:

 "0 0 * * *"é uma expressão cron que define a programação. Ela indica que a DAG será executada diariamente à meia-noite (00:00) todos os dias.
Portanto, a DAG será agendada para ser executada uma vez por dia, sempre à meia-noite, de acordo com esse cronograma.

* **| dagrun_timeout = timedelta(minutes = 60) |** : Indica a Definição do tempo máximo permitido para a execução de uma instância da DAG (DAG run). Isso significa que cada execução da DAG tem um limite de tempo de 60 minutos (1 hora). Se a execução da DAG não for concluída dentro desse limite de tempo, ela será considerada falha.

* **| description = 'Job ETL de Carga no DW com Airflow' |** : Indica a Definição de uma descrição ou um resumo do que é o trabalho (job) realizado pela DAG.

* **| start_date = airflow.utils.dates.days_ago(1) |** : Indica que o start Date será um dia anterior a data que vamos criar no Airflow, ou seja  indica que a DAG começará a ser executada um dia atrás da data atual.
Essa configuração permite agendar a DAG para ser executada a partir de um ponto no passado, o que pode ser útil em cenários em que você deseja retroativamente executar tarefas ou processar dados a partir de uma data anterior.

**Obs:** Temos também um start_date no  default_args, irá gerar conflito? Não gerará conflito e se tivermos o argumentos em dois locais, vale por ultimo o que estiver a DAG

**Dessa forma temos a DAG criada que é o bloco principal para execuação do Apahe Airflow.**

---



###**1.3- Terceiro Bloco de Código " Função para Carregar dados no DW**

In [None]:
def func_carrega_dados_clientes():

    # Obter o caminho do arquivo CSV
    csv_file_path = '/opt/airflow/dags/dados/DIM_CLIENTE.csv'

    # Inicializa o contador
    i = 0

    # Abrir o Arquivo CSV
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Inserir dados na tabela PostgreSQL

            sql_query_cli =

            "INSERT INTO varejo.DIM_CLIENTE (%s) VALUES (%s)" %

             (','.join(dados_cli.keys()),

             ','.join([item for item in dados_cli.values()]))

            # Operador do Postgres com incremento no id da tarefa
            # (para cada linha inserida)

            postgres_operator = PostgresOperator
             (task_id = 'carrega_dados_clientes_' + str(i),
                                                 sql = sql_query_cli,
                                                 params = (dados_cli),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute()


tarefa_carrega_dados_clientes = PythonOperator(
        task_id = 'tarefa_carrega_dados_clientes',
        python_callable = func_carrega_dados_clientes,
        provide_context = True,
        dag = dag_log_solutions
    )


###**Comentando os tópicos da Função**



**Tópico: Obter o caminho do arquivo CSV**

In [None]:
def func_carrega_dados_clientes():

    # Obter o caminho do arquivo CSV
    csv_file_path = '/opt/airflow/dags/dados/DIM_CLIENTE.csv'




* **| def func_carrega_dados_clientes() | :** Esta linha define uma função chamada **func_carrega_dados_clientes** sem parâmetros.

* **| csv_file_path = '/ opt/airflow/dags/dados/DIM_CLIENTE.csv' | :** Esta linha cria uma variável chamada **csv_file_path** que armazena uma string contendo o caminho do arquivo CSV.\
caminho do arquivo é **'/ opt/airflow/dags/dados/DIM_CLIENTE.csv'**

---

**Tópico: Inicializar o contador**

In [None]:
# Inicializa o contador
    i = 0

* **| i = 0 | :** Isso inicializa uma variável chamada "i" com o valor 0. Essa variável será usada como um contador para contar o número de linhas no arquivo CSV.

---

**Tópico: Abrir o Arquivo CSV e Incrementar um contador**

In [None]:
# Abrir o Arquivo CSV
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

* **| with open(csv_file_path, 'r') as f| :** Esta linha abre um arquivo CSV especificado pelo caminho **csv_file_path** em modo de leitura **('r')**. O uso do bloco **with** garante que o arquivo seja fechado automaticamente após o término do processamento. O arquivo aberto é representado pela variável **f**.

* **| reader = csv.DictReader(f) | :** Aqui, é criado um objeto **DictReader** do módulo **csv**. O objetivo deste objeto é ler as linhas do arquivo CSV e tratar a primeira linha como cabeçalho, onde os nomes das colunas são usados como chaves de dicionário. O objeto **DictReader** é inicializado com o arquivo **f**, permitindo que ele leia o conteúdo do arquivo.

* **| for item in reader | :** Este é um **loop for** que itera sobre cada linha do **arquivo CSV**. A cada iteração, item conterá uma linha do **arquivo CSV** representada como um dicionário, onde as chaves do dicionário são os nomes das colunas e os valores são os dados daquela linha.

* **| i += 1 | :** Dentro do **loop for**, esta linha incrementa o contador **"i"** em 1 a cada iteração. O **contador "i"** é usado para acompanhar o número de linhas **(registros)** no arquivo CSV. Cada vez que uma nova linha é lida, **"i"** é incrementado, o que permite contar quantas linhas existem no arquivo.

**Obs: No geral, esse código tem a finalidade:**
* Abrir um arquivo CSV;
* ler cada linha do arquivo;
* Contar o número total de linhas (registros) no arquivo e armazenar esse número na variável "i".
* Isso pode ser útil para obter informações sobre o tamanho do arquivo ou para rastrear o progresso de um processo que envolve a leitura de um arquivo CSV.

---

**Tópico: Criando um novo dicionário**

In [None]:
# Criando um novo  dicionário
            dados_cli = dict(item)

O comando **dados_cli = dict(item)** serve para converter um dicionário Python em outro dicionário. Mais especificamente, ele cria uma cópia independente do dicionário representado pela variável **item** e armazena essa cópia na variável **dados_cli**.

Aqui está o que cada parte do comando faz:

* **| dict(item) |** : Esta parte do comando cria um novo dicionário utilizando a **função dict()**. O argumento passado para **dict()** é a variável **item**, que é um **dicionário existente**. Isso efetivamente cria uma cópia do dicionário item.

* **| dados_cli |=** : Esta parte do comando atribui a cópia do dicionário à variável dados_cli. Portanto, após a execução dessa linha, a variável **dados_cli** contém uma cópia independente dos dados contidos em item.

Essa cópia pode ser útil para fazer alterações nos dados sem afetar o dicionário original **item**. Em essência, isso permite trabalhar com duas versões separadas dos dados, uma em **item** e outra em **dados_cli**, facilitando a manipulação e o processamento de informações em seu código.

---







**Tópico: Inserir dados na tabela PostgreSQL**

In [None]:
 sql_query_cli =

 "INSERT INTO varejo.DIM_CLIENTE (%s) VALUES (%s)"

 % (','.join(dados_cli.keys()),

 ','.join([item for item in dados_cli.values()]))



* **| sql_query_cli | :** Indica uma variável que armazena a **consulta SQL** que será executada posteriormente em um banco de dados. A consulta é usada para inserir dados em uma tabela chamada **varejo.DIM_CLIENTE**.


* **| INSERT INTO varejo.DIM_CLIENTE (%s) VALUES (%s)" :** Esta parte da string é a estrutura básica de uma consulta **SQL de inserção**. Ela começa com a instrução **SQL INSERT INTO**, seguida pelo nome do schema e tabela na qual os dados serão inseridos, que é **varejo.DIM_CLIENTE**. Em seguida, há dois marcadores de espaço reservado **%s** que serão substituídos pelos valores reais.

* **| ' , '.join(dados_cli.keys()) |**:
* **dados_cli.keys**() : retorna uma lista das chaves (nomes das colunas) de um dicionário chamado dados_cli. Essas chaves correspondem às colunas da tabela varejo.DIM_CLIENTE.

* ' , '.**join**(...) : é usado para unir essas chaves em uma única string, separando-as por vírgulas. Isso cria a lista de nomes de colunas na consulta SQL. Por exemplo, se as chaves forem ['nome', 'idade'], isso se tornará "nome,idade".
','.join([item for item in dados_cli.values()]):

* **dados_cli.values()** : Retorna uma lista dos valores associados às chaves no dicionário dados_cli. Esses valores são os dados que serão inseridos nas colunas correspondentes da tabela.

* **[item for item in dados_cli.values()]** : É uma compreensão de lista que cria uma lista dos valores do dicionário dados_cli. Por exemplo, se os valores forem ['João', 30], isso permanecerá igual: ['João', 30].

* **' ,'.join(...)** : É usado novamente para unir esses valores em uma única string, separando-os por vírgulas. Isso cria a lista de valores que corresponderão às colunas na consulta SQL. Por exemplo, se os valores forem ['João', 30], isso se tornará "'João',30".

---

**Tópico:  Operador do Postgres com incremento no id da tarefa (para cada linha inserida)**

In [None]:
 # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_clientes_' + str(i),
                                                 sql = sql_query_cli,
                                                 params = (dados_cli),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)



* **| postgres_operator = PostgresOperator(...) | :** Aqui, estamos criando uma instância de PostgresOperator e atribuindo-a à variável **postgres_operator**. Esta instância representa uma tarefa que será executada no fluxo de trabalho do **Apache Airflow.**

* **|task_id = 'carrega_dados_clientes_' + str(i) | :** O parâmetro **task_id** é um identificador exclusivo para a tarefa. Ele está sendo definido dinamicamente com base no valor da variável i. Isso significa que cada tarefa terá um identificador único, com **"carrega_dados_clientes_"** seguido pelo valor de **i** convertido em string.

* **| sql = sql_query_cli | :** O parâmetro sql contém a consulta SQL que será executada pelo operador. sql_query_cli deve ser uma string que contém a consulta SQL que extrairá dados da fonte de dados.

* **| params = (dados_cli) | :** O parâmetro params permite passar parâmetros para a consulta SQL. dados_cli é uma variável que contém os parâmetros a serem usados na consulta SQL. Esses parâmetros podem ser usados para filtrar ou personalizar a consulta.

* **| postgres_conn_id = 'LOGDW' | :** O parâmetro postgres_conn_id especifica a conexão com o banco de dados PostgreSQL que será usada para executar a consulta. Nesse caso, a conexão é identificada pelo nome "LOGDW", que deve estar configurado nas conexões do Airflow.

* **| dag = dag_log_solutions | :** O parâmetro dag especifica a qual fluxo de trabalho (DAG - Directed Acyclic Graph) essa tarefa pertencerá. As tarefas fazem parte de um fluxo de trabalho maior, e dag_log_solutions é a DAG à qual essa tarefa será adicionada.

Resumindo, o código cria uma tarefa que utiliza o **operador PostgresOperator** para executar uma **consulta SQL no banco de dados PostgreSQL.** A consulta é definida pela variável **sql_query_cli**, e os parâmetros são passados por meio da variável **dados_cli**. Cada tarefa tem um identificador exclusivo baseado no valor de **i** e faz parte da **DAG denominada dag_log_solutions**. Essa estrutura é usada para automatizar a carga de dados do **PostgreSQL** como parte de um fluxo de trabalho no **Apache Airflow.**
___






**Tópico: Executar o operador**

In [None]:
# Executa o operador
postgres_operator.execute()

O código executa o método **execute()** em uma instância do operador **PostgresOperator.**

* **| postgres_operator |**:\
É uma instância do operador **PostgresOperator**, que provavelmente foi criada anteriormente no código. Este operador representa uma tarefa que executará uma consulta SQL em um banco de dados **PostgreSQL.**

* **| execute() |**:\
É um método da classe **PostgresOperator** ou de uma classe pai na hierarquia. Quando este método é chamado, ele inicia a execução da tarefa representada pelo **operador**. Em outras palavras, **ele dispara a execução da consulta SQL definida na tarefa.**

Portanto, ao chamar **postgres_operator.execute()**, o código está instruindo o **Apache Airflow** para iniciar a execução da tarefa representada por **postgres_operator**. Isso fará com que a consulta SQL seja enviada ao banco de dados **PostgreSQL** especificado no operador, e a tarefa será executada conforme definido no **DAG (Directed Acyclic Graph)** ao qual a tarefa pertence.

Em resumo, essa linha de código é responsável por acionar a execução da tarefa que contém a **consulta SQL** no contexto de um fluxo de trabalho gerenciado pelo **Apache Airflow.**

----






**Tópico: Carregar os dados na tabela Cliente**

In [None]:
Tarefa_carrega_dados_clientes = PythonOperator(
        task_id = 'tarefa_carrega_dados_clientes',
        python_callable = func_carrega_dados_clientes,
        provide_context = True,
        dag = dag_log_solutions
    )


O código cria uma instância da **classe PythonOperator**, que é usada no **Apache Airflow** para definir tarefas que executarão código Python.

* **|tarefa_carrega_dados_clientes | :** Isso é uma variável que representa a tarefa que está sendo definida. O nome **tarefa_carrega_dados_clientes** é um identificador único para essa tarefa dentro do **DAG (Directed Acyclic Graph)**. Você usará esse identificador para referenciar e controlar essa tarefa em outras partes do código.

* **| task_id |** : É um parâmetro obrigatório e é o identificador único da tarefa dentro do DAG. No exemplo, o task_id é definido como **'tarefa_carrega_dados_clientes'**, o que significa que esta tarefa será conhecida pelo nome **'tarefa_carrega_dados_clientes'.**

* **| python_callable |** : É um parâmetro que especifica a **função Python** que será executada quando a tarefa for acionada. Neste caso, **func_carrega_dados_clientes** é a função Python que será chamada. Isso significa que quando essa tarefa for executada, o código contido em **func_carrega_dados_clientes** será executado.

* **|provide_context |** : É um parâmetro opcional que determina se o contexto do **Airflow** deve ser fornecido como argumento para a função Python. Se **provide_context** for definido como **True**, o contexto do **Airflow**, que inclui informações sobre a execução da tarefa, será passado como um dicionário para a função **func_carrega_dados_clientes**. Isso permite que a função Python acesse informações sobre a execução, como a data de início, data de término, variáveis definidas globalmente e muito mais.

* **|dag |** : É o DAG ao qual esta tarefa pertence. Um DAG é um fluxo de trabalho composto por tarefas interconectadas. A tarefa que está sendo definida aqui faz parte do DAG chamado **dag_log_solutions.**

Em resumo, o código cria uma tarefa chamada **'tarefa_carrega_dados_clientes'** que executa a função Python **func_carrega_dados_clientes** e tem acesso ao contexto do **Airflow.** Essa tarefa pode ser adicionada a um fluxo de trabalho (DAG) e programada para ser executada conforme necessário dentro do fluxo de trabalho.

---

###**Observação Importante**:
Os dados que vão compor a tabela **Fato** e as dimensções abaixo
* **Transportadoras**;
* **Depósitos**;
* **Entregas**;
* **Frete**;
* **Pagamentos**
* **Data**

Passaram pelo mesmo processo acima de criação dos codigos, ou seja o mesmo processo de criação do processo ETL - Extração, Transformação e carregamento desses dados que foi submedido os dados que vão compor a tebala cliente do DW, foram aplicados aos dados das demais dimensões e Fato que serão carregados no DW.

---

In [None]:
tarefa_trunca_tb_fato = PostgresOperator(task_id = 'tarefa_trunca_tb_fato', postgres_conn_id = 'LOGDW',\
sql = "TRUNCATE TABLE varejo.TB_FATO CASCADE", dag = dag_log_solutions)