##**Código Python para ETL- Extração, Transformação e Carregamento**





In [None]:
# Imports
import csv
import airflow
import time
import pandas as pd
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago

# Argumentos
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Cria a DAG
# https://crontab.guru/
dag_log_solutions = DAG(dag_id = "logsol",
                   default_args = default_args,
                   schedule_interval = '0 0 * * *',
                   dagrun_timeout = timedelta(minutes = 60),
                   description = 'Job ETL de Carga no DW com Airflow',
                   start_date = airflow.utils.dates.days_ago(1)
)



##### Tabela de Clientes #####

def func_carrega_dados_clientes(**kwargs):

    # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.DIM_CLIENTE ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_clientes_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_clientes = PythonOperator(
        task_id = 'tarefa_carrega_dados_clientes',
        python_callable = func_carrega_dados_clientes,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/DIM_CLIENTE.csv'}},
        dag = dag_log_solutions
    )


##### Tabela de Transportadoras #####

def func_carrega_dados_transportadora(**kwargs):

    # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.DIM_TRANSPORTADORA ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_transportadora_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_transportadora = PythonOperator(
        task_id = 'tarefa_carrega_dados_transportadora',
        python_callable = func_carrega_dados_transportadora,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/DIM_TRANSPORTADORA.csv'}},
        dag = dag_log_solutions
    )



##### Tabela de Depósitos #####

def func_carrega_dados_deposito(**kwargs):

    # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.DIM_DEPOSITO ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_deposito_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_deposito = PythonOperator(
        task_id = 'tarefa_carrega_dados_deposito',
        python_callable = func_carrega_dados_deposito,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/DIM_DEPOSITO.csv'}},
        dag = dag_log_solutions
    )


##### Tabela de Entregas #####

def func_carrega_dados_entrega(**kwargs):

  # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.DIM_ENTREGA ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_entrega_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_entrega = PythonOperator(
        task_id = 'tarefa_carrega_dados_entrega',
        python_callable = func_carrega_dados_entrega,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/DIM_ENTREGA.csv'}},
        dag = dag_log_solutions
    )



##### Tabela de Frete #####

def func_carrega_dados_frete(**kwargs):

     # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.DIM_FRETE ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_frete_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_frete = PythonOperator(
        task_id = 'tarefa_carrega_dados_frete',
        python_callable = func_carrega_dados_frete,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/DIM_FRETE.csv'}},
        dag = dag_log_solutions
    )





##### Tabela de Tipos de Pagamentos #####

def func_carrega_dados_pagamento(**kwargs):

     # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.DIM_PAGAMENTO ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_pagamento_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_pagamento = PythonOperator(
        task_id = 'tarefa_carrega_dados_pagamento',
        python_callable = func_carrega_dados_pagamento,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/DIM_PAGAMENTO.csv'}},
        dag = dag_log_solutions
    )




##### Tabela de Data #####

def func_carrega_dados_data(**kwargs):

     # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.DIM_DATA ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_data_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_data = PythonOperator(
        task_id = 'tarefa_carrega_dados_data',
        python_callable = func_carrega_dados_data,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/DIM_DATA.csv'}},
        dag = dag_log_solutions
    )


##### Tabela de Fatos #####

def func_carrega_dados_fatos(**kwargs):

    # Get the csv file path
    csv_file_path = kwargs['params']['csv_file_path']

    # Inicializa o contador
    i = 0

    # Open the csv file
    with open(csv_file_path, 'r') as f:

        reader = csv.DictReader(f)

        for item in reader:

            # Icrementa o contador
            i += 1

            # Extrai uma linha como dicionário
            dados_cli = dict(item)

            # Define as colunas e placeholders para a consulta SQL
            columns = ', '.join(dados_cli.keys())
            placeholders = ', '.join(['%s'] * len(dados_cli))

            # Consulta SQL com placeholders
            sql_query_cli = f"INSERT INTO varejo.TB_FATO ({columns}) VALUES ({placeholders})"

            # Operador do Postgres com incremento no id da tarefa (para cada linha inserida)
            postgres_operator = PostgresOperator(task_id = 'carrega_dados_fatos_' + str(i),
                                                 sql = sql_query_cli,
                                                 parameters = list(dados_cli.values()),
                                                 postgres_conn_id = 'LOGDW',
                                                 dag = dag_log_solutions)

            # Executa o operador
            postgres_operator.execute(context = kwargs)


tarefa_carrega_dados_fatos = PythonOperator(
        task_id = 'tarefa_carrega_dados_fatos',
        python_callable = func_carrega_dados_fatos,
        provide_context = True,
        op_kwargs = {'params': {'csv_file_path': '/opt/airflow/dags/dados/TB_FATOS.csv'}},
        dag = dag_log_solutions
    )



# Tarefas para limpar as tabelas
tarefa_trunca_tb_fato = PostgresOperator(task_id = 'tarefa_trunca_tb_fato', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.TB_FATO CASCADE", dag = dag_log_solutions)
tarefa_trunca_dim_cliente = PostgresOperator(task_id = 'tarefa_trunca_dim_cliente', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.DIM_CLIENTE CASCADE", dag = dag_log_solutions)
tarefa_trunca_dim_pagamento = PostgresOperator(task_id = 'tarefa_trunca_dim_pagamento', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.DIM_PAGAMENTO CASCADE", dag = dag_log_solutions)
tarefa_trunca_dim_frete = PostgresOperator(task_id = 'tarefa_trunca_dim_frete', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.DIM_FRETE CASCADE", dag = dag_log_solutions)
tarefa_trunca_dim_data = PostgresOperator(task_id = 'tarefa_trunca_dim_data', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.DIM_DATA CASCADE", dag = dag_log_solutions)
tarefa_trunca_dim_transportadora = PostgresOperator(task_id = 'tarefa_trunca_dim_transportadora', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.DIM_TRANSPORTADORA CASCADE", dag = dag_log_solutions)
tarefa_trunca_dim_entrega = PostgresOperator(task_id = 'tarefa_trunca_dim_entrega', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.DIM_ENTREGA CASCADE", dag = dag_log_solutions)
tarefa_trunca_dim_deposito = PostgresOperator(task_id = 'tarefa_trunca_dim_deposito', postgres_conn_id = 'LOGDW', sql = "TRUNCATE TABLE varejo.DIM_DEPOSITO CASCADE", dag = dag_log_solutions)

# Upstream
tarefa_trunca_tb_fato >> tarefa_trunca_dim_cliente >> tarefa_trunca_dim_pagamento >> tarefa_trunca_dim_frete >> tarefa_trunca_dim_data >> tarefa_trunca_dim_transportadora >> tarefa_trunca_dim_entrega >> tarefa_trunca_dim_deposito >> tarefa_carrega_dados_clientes >> tarefa_carrega_dados_transportadora >> tarefa_carrega_dados_deposito >> tarefa_carrega_dados_entrega >> tarefa_carrega_dados_frete >> tarefa_carrega_dados_pagamento >> tarefa_carrega_dados_data >> tarefa_carrega_dados_fatos

# Bloco main
if __name__ == "__main__":
    dag_log_solutions.cli()



