In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.utils.dates import days_ago
import requests
import os
import shutil
import zipfile
import pyodbc
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import year, month, col, when
from pathlib import Path
from sqlalchemy import create_engine, types
import tarfile
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager


def extract_data():
    # Configuração do WebDriver para rodar no Windows
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # Executa sem abrir janela

    # Instalação automática do ChromeDriver
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)

    # URL da página a ser acessada
    url = "https://web3.antaq.gov.br/ea/sense/download.html#pt"
    driver.get(url)

    # Pasta onde os arquivos serão baixados
    download_folder = "dados_antaq"

    # Palavras-chave para filtrar os links de download
    keywords = {"Atracacao", "Carga", "CargaConteinerizada"}
    exclusion_keywords = {"Regiao"}

    # Certifique-se de que a pasta de download existe
    os.makedirs(download_folder, exist_ok=True)

    def extract_file(file_path):
        """Descompacta arquivos .zip ou .tar.gz e exclui o arquivo compactado."""
        try:
            if file_path.endswith(".zip"):
                with zipfile.ZipFile(file_path, 'r') as zip_ref:
                    zip_ref.extractall(download_folder)  # Extrai os arquivos na pasta de download
            elif file_path.endswith(".tar.gz") or file_path.endswith(".tar"):
                with tarfile.open(file_path, 'r:*') as tar_ref:
                    tar_ref.extractall(download_folder)  # Extrai os arquivos na pasta de download
            else:
                print(f"Formato não suportado: {file_path}")
                return

            os.remove(file_path)  # Remove o arquivo compactado após extração
            print(f"Arquivo {file_path} descompactado e pasta zip removido com sucesso.")
        except Exception as e:
            print(f"Erro ao descompactar {file_path}: {e}")

    def get_filtered_links(driver):
        """Obtém e filtra os links da página com base nas palavras-chave definidas."""
        links = driver.find_elements(By.TAG_NAME, "a")  # Encontra todos os links da página
        return [
            link.get_attribute("href") for link in links
            if link.get_attribute("href") and
            any(keyword in link.get_attribute("href") for keyword in keywords) and
            not any(ex_keyword in link.get_attribute("href") for ex_keyword in exclusion_keywords)
        ]

    def download_file(url):
        """Faz o download do arquivo a partir da URL informada."""
        try:
            response = requests.get(url, timeout=30)  # Faz a requisição do arquivo
            response.raise_for_status()  # Verifica se houve erro na requisição

            file_name = os.path.basename(url)  # Obtém o nome do arquivo a partir da URL
            file_path = os.path.join(download_folder, file_name)  # Define o caminho completo do arquivo

            with open(file_path, 'wb') as file:
                file.write(response.content)  # Salva o conteúdo baixado no arquivo

            print(f"Arquivo {file_name} baixado com sucesso.")
            extract_file(file_path)  # Chama a função para descompactar o arquivo, se necessário
        except requests.exceptions.RequestException as e:
            print(f"Erro ao baixar {url}: {e}")

    def process_year(driver, year):
        """Coleta e baixa arquivos para um ano específico."""
        print(f"Processando ano {year}...")

        try:
            # Aguarda até que o elemento select (caixa de seleção de ano) esteja visível
            select_box = WebDriverWait(driver, 10).until(
                EC.visibility_of_element_located((By.TAG_NAME, "select"))
            select_box.send_keys(str(year))  # Seleciona o ano desejado
            time.sleep(6)  # Aguarda carregamento dos links após a seleção

            links = get_filtered_links(driver)  # Obtém os links filtrados

            if links:
                print(f"{len(links)} arquivos encontrados para {year}. Iniciando download...")
                for url in links:
                    download_file(url)  # Baixa os arquivos encontrados
            else:
                print(f"Nenhum link válido encontrado para {year}.")
        except Exception as e:
            print(f"Erro ao processar o ano {year}: {e}")

    # Loop para processar os anos de 2021 a 2023
    for year in range(2021, 2024):
        process_year(driver, year)


def transform_data():
    # Inicializar a sessão do Spark
    spark = SparkSession.builder \
        .appName("ANTAQ ETL") \
        .config("spark.jars", "/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/ext/mssql-jdbc-12.2.0.jre8.jar") \
        .getOrCreate()

    # Caminho para os arquivos
    base_path = r"C:\Users\leona\dados_antaq"

    # Lista de anos para processar
    anos = [2021, 2022, 2023]

    # Lista para armazenar os DataFrames de atracação e tempos
    atracacao_dfs = []
    tempos_dfs = []

    # Ler e processar os arquivos de atracação e tempos de atracação para cada ano
    for ano in anos:
        # Ler o arquivo de atracação com delimitador ';'
        atracacao_path = os.path.join(base_path, f"{ano}Atracacao.txt")
        atracacao_df = spark.read.csv(atracacao_path, header=True, inferSchema=True, sep=";")

        # Adicionar coluna "Ano" e "Mês" em atracação a partir da coluna 'Data Atracação'
        atracacao_df = atracacao_df.withColumn("Ano", F.year(col("Data Atracação"))) \
            .withColumn("Mês", F.month(col("Data Atracação")))
        atracacao_dfs.append(atracacao_df)

        # Ler o arquivo de tempos de atracação com delimitador ';'
        tempos_path = os.path.join(base_path, f"{ano}TemposAtracacao.txt")
        tempos_df = spark.read.csv(tempos_path, header=True, inferSchema=True, sep=";")

        # Garantir que temos a coluna IDAtracacao para o join
        tempos_dfs.append(tempos_df)

    # Concatenar todos os DataFrames de atracação
    atracacao_final_df = atracacao_dfs[0]
    for df in atracacao_dfs[1:]:
        atracacao_final_df = atracacao_final_df.union(df)

    # Concatenar todos os DataFrames de tempos de atracação
    tempos_final_df = tempos_dfs[0]
    for df in tempos_dfs[1:]:
        tempos_final_df = tempos_final_df.union(df)

    # Realizar o join entre os DataFrames de atracação e tempos de atracação com base na coluna IDAtracacao
    final_df = atracacao_final_df.join(tempos_final_df, "IDAtracacao", "left")

    # Selecionar as colunas desejadas para a tabela final
    atracacao_final_df = final_df.select(
        "IDAtracacao",
        "CDTUP",
        "IDBerco",
        "Berço",
        "Porto Atracação",
        "Apelido Instalação Portuária",
        "Complexo Portuário",
        "Tipo da Autoridade Portuária",
        F.to_timestamp(col("Data Atracação"), "dd/MM/yyyy HH:mm:ss").alias("Data Atracação"),
        F.to_timestamp(col("Data Chegada"), "dd/MM/yyyy HH:mm:ss").alias("Data Chegada"),
        F.to_timestamp(col("Data Desatracação"), "dd/MM/yyyy HH:mm:ss").alias("Data Desatracação"),
        F.to_timestamp(col("Data Início Operação"), "dd/MM/yyyy HH:mm:ss").alias("Data Início Operação"),
        F.to_timestamp(col("Data Término Operação"), "dd/MM/yyyy HH:mm:ss").alias("Data Término Operação"),
        F.year(F.to_timestamp(col("Data Início Operação"), "dd/MM/yyyy HH:mm:ss")).alias("Ano da data de início da operação"),
        F.lpad(F.month(F.to_timestamp(col("Data Início Operação"), "dd/MM/yyyy HH:mm:ss")).cast("string"), 2, "0").alias("Mês da data de início da operação"),
        "Tipo de Operação",
        "Tipo de Navegação da Atracação",
        "Nacionalidade do Armador",
        "FlagMCOperacaoAtracacao",
        "Terminal",
        "Município",
        "UF",
        "SGUF",
        "Região Geográfica",
        "Nº da Capitania",
        "Nº do IMO",
        "TEsperaAtracacao",
        "TesperaInicioOp",
        "TOperacao",
        "TEsperaDesatracacao",
        "TAtracado",
        "TEstadia"
    )

    # Caminho para os arquivos
    base_path = r"C:\Users\leona\dados_antaq"

    # Listar os anos a serem processados
    anos = [2021, 2022, 2023]

    # Lista para armazenar os DataFrames de carga e outras tabelas
    carga_dfs = []
    atracacao_dfs = []
    carga_conteinerizada_dfs = []

    # Ler e processar os arquivos de carga, atracação, carga conteinerizada
    for ano in anos:
        # Ler o arquivo de carga
        carga_path = os.path.join(base_path, f"{ano}Carga.txt")
        carga_df = spark.read.csv(carga_path, header=True, inferSchema=True, sep=";")

        # Verificar se a carga é conteinerizada
        carga_df = carga_df.withColumn("Carga_Conteinerizada",
                                       when(col("Carga Geral Acondicionamento") == "Conteinerizada", True).otherwise(False))

        # Carregar a tabela CargaConteinerizada
        carga_conteinerizada_path = os.path.join(base_path, f"{ano}Carga_Conteinerizada.txt")
        carga_conteinerizada_df = spark.read.csv(carga_conteinerizada_path, header=True, inferSchema=True, sep=";")

        # Unir a tabela Carga com a CargaConteinerizada (quando for conteinerizada)
        carga_df = carga_df.join(carga_conteinerizada_df, carga_df["IDCarga"] == carga_conteinerizada_df["IDCarga"], "left") \
            .select(carga_df["*"], carga_conteinerizada_df["CDMercadoriaConteinerizada"], carga_conteinerizada_df["VLPesoCargaConteinerizada"])

        # Para carga conteinerizada, o Peso líquido será o Peso sem contêiner
        carga_df = carga_df.withColumn("Peso_liquido",
                                       when(col("Carga_Conteinerizada") == True, col("VLPesoCargaConteinerizada"))
                                       .otherwise(col("VLPesoCargaBruta")))

        # Alterar a coluna "CDMercadoria" para considerar os códigos das mercadorias da CargaConteinerizada
        carga_df = carga_df.withColumn("CDMercadoria",
                                       when(col("Carga_Conteinerizada") == True, col("CDMercadoriaConteinerizada"))
                                       .otherwise(col("CDMercadoria")))

        # Remover a coluna "CDMercadoriaConteinerizada", pois queremos apenas "CDMercadoria"
        carga_df = carga_df.drop("CDMercadoriaConteinerizada")

        # Adicionar a tabela de carga processada à lista
        carga_dfs.append(carga_df)

    # Concatenar todos os DataFrames de carga
    carga_final_df = carga_dfs[0]
    for df in carga_dfs[1:]:
        carga_final_df = carga_final_df.union(df)


def load_data():
    # Criar uma sessão do Spark
    spark = SparkSession.builder.appName("Download Parquet").getOrCreate()

    # Salvar atracacao_fato como Parquet
    atracacao_final_df.write.mode("overwrite").parquet("atracacao_fato.parquet")

    # Salvar carga_fato como Parquet
    carga_final_df.write.mode("overwrite").parquet("carga_fato.parquet")

    # Remover arquivos .crc antes de compactar
    for crc_file in Path("atracacao_fato.parquet").rglob("*.crc"):
        crc_file.unlink()
    for crc_file in Path("carga_fato.parquet").rglob("*.crc"):
        crc_file.unlink()

    # Compactar apenas os arquivos parquet
    shutil.make_archive("atracacao_fato", 'zip', "atracacao_fato.parquet")
    shutil.make_archive("carga_fato", 'zip', "carga_fato.parquet")

    # Caminhos dos arquivos ZIP
    caminho_carga = r"C:\Users\leona\carga_fato.zip"
    caminho_atracacao = r"C:\Users\leona\atracacao_fato.zip"

    # Caminhos de destino para descompactação
    caminho_destino_carga = r"C:\Users\leona\carga"
    caminho_destino_atracacao = r"C:\Users\leona\atracacao"

    # Verifica se os diretórios de destino existem; se não, cria os diretórios
    if not os.path.exists(caminho_destino_carga):
        os.makedirs(caminho_destino_carga)

    if not os.path.exists(caminho_destino_atracacao):
        os.makedirs(caminho_destino_atracacao)

    # Função para descompactar e carregar os dados Parquet
    def descompactar_e_carregar_parquet(caminho_zip, caminho_destino, nome_arquivo):
        with zipfile.ZipFile(caminho_zip, 'r') as zip_ref:
            # Extrai todos os arquivos para o diretório de destino
            zip_ref.extractall(caminho_destino)
            print(f"{nome_arquivo} extraído com sucesso para: {caminho_destino}")

            # Encontra o arquivo Parquet dentro do diretório extraído
            arquivos = zip_ref.namelist()
            arquivo_parquet = [f for f in arquivos if f.endswith('.parquet')][0]  # Pega o primeiro arquivo Parquet
            caminho_parquet = os.path.join(caminho_destino, arquivo_parquet)

            # Carrega o arquivo Parquet usando pandas
            df = pd.read_parquet(caminho_parquet)

            return df

    # Descompacta e carrega os dados
    df_carga = descompactar_e_carregar_parquet(caminho_carga, caminho_destino_carga, "carga_fato.zip")
    df_atracacao = descompactar_e_carregar_parquet(caminho_atracacao, caminho_destino_atracacao, "atracacao_fato.zip")


def load_table_a():
    # Caminho da pasta onde estão os arquivos
    pasta_atracacao = r"C:\Users\leona\atracacao"

    # Encontra o arquivo Parquet na pasta
    arquivos = os.listdir(pasta_atracacao)
    arquivo_parquet = [f for f in arquivos if f.endswith('.parquet')][0]  # Pega o primeiro arquivo Parquet
    caminho_parquet_atracacao = os.path.join(pasta_atracacao, arquivo_parquet)

    # Configurações de conexão
    server = 'localhost'  # Nome do servidor SQL Server
    database = 'master'  # Nome do banco de dados
    username = 'leo'  # Nome de usuário
    password = '12345'  # Senha

    # String de conexão
connection_string = f"mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server"

# Cria a engine de conexão com o SQL Server
engine = create_engine(connection_string)

# Lê o arquivo Parquet
df_atracacao = pd.read_parquet(caminho_parquet_atracacao)

# Função para limpar valores discrepantes
def limpar_valores_discrepantes(df):
    # Substitui "Valor Discrepante" por NaN
    df.replace("Valor Discrepante", np.nan, inplace=True)
    
    # Converte colunas numéricas para o tipo correto
    colunas_numericas = [
        'TEsperaAtracacao', 'TesperaInicioOp', 'TOperacao', 
        'TEsperaDesatracacao', 'TAtracado', 'TEstadia'
    ]
    for coluna in colunas_numericas:
        df[coluna] = pd.to_numeric(df[coluna], errors='coerce')  # Converte para numérico, inválidos viram NaN
    
    return df

# Limpa os dados
df_atracacao = limpar_valores_discrepantes(df_atracacao)

# Define o mapeamento de tipos de colunas para o SQL Server
dtype_mapping = {
    'IDAtracacao': types.INTEGER(),
    'CDTUP': types.VARCHAR(length=50),
    'IDBerco': types.VARCHAR(length=50),
    'Berco': types.VARCHAR(length=100),
    'PortoAtracacao': types.VARCHAR(length=100),
    'ApelidoInstalacaoPortuaria': types.VARCHAR(length=100),
    'ComplexoPortuario': types.VARCHAR(length=100),
    'TipoAutoridadePortuaria': types.VARCHAR(length=100),
    'DataAtracacao': types.DATETIME(),
    'DataChegada': types.DATETIME(),
    'DataDesatracacao': types.DATETIME(),
    'DataInicioOperacao': types.DATETIME(),
    'DataTerminoOperacao': types.DATETIME(),
    'AnoDataInicioOperacao': types.INTEGER(),
    'MesDataInicioOperacao': types.VARCHAR(length=50),
    'TipoOperacao': types.VARCHAR(length=100),
    'TipoNavegacaoAtracacao': types.VARCHAR(length=100),
    'NacionalidadeArmador': types.INTEGER(),
    'FlagMCOperacaoAtracacao': types.INTEGER(),
    'Terminal': types.VARCHAR(length=100),
    'Municipio': types.VARCHAR(length=100),
    'UF': types.VARCHAR(length=50),
    'SGUF': types.VARCHAR(length=50),
    'RegiaoGeografica': types.VARCHAR(length=100),
    'NumeroCapitania': types.VARCHAR(length=50),
    'NumeroIMO': types.INTEGER(),
    'TEsperaAtracacao': types.FLOAT(),
    'TesperaInicioOp': types.FLOAT(),
    'TOperacao': types.FLOAT(),
    'TEsperaDesatracacao': types.FLOAT(),
    'TAtracado': types.FLOAT(),
    'TEstadia': types.FLOAT()
}

# Nome da tabela no SQL Server
table_name = 'atracacao_fato'

# Cria a tabela no SQL Server (se não existir) e insere os dados
try:
    # Conecta ao banco de dados
    with engine.connect() as connection:
        # Cria a tabela
        df_atracacao.head(0).to_sql(
            name=table_name,
            con=connection,
            if_exists='replace',
            index=False,
            dtype=dtype_mapping
        )
        print(f"Tabela '{table_name}' criada com sucesso.")

        # Insere os dados na tabela
        df_atracacao.to_sql(
            name=table_name,
            con=connection,
            if_exists='append',
            index=False
        )
        print(f"Dados inseridos na tabela '{table_name}' com sucesso.")
except Exception as e:
    print(f"Erro ao inserir dados no SQL Server: {e}")
def load_table_c():
    # Caminho da pasta onde estão os arquivos
    pasta_carga = r"C:\Users\leona\carga"

    # Encontra o arquivo Parquet na pasta
    arquivos = os.listdir(pasta_carga)
    arquivo_parquet = [f for f in arquivos if f.endswith('.parquet')][0]  # Pega o primeiro arquivo Parquet
    caminho_parquet_carga = os.path.join(pasta_carga, arquivo_parquet)

    # Configurações de conexão
    server = 'localhost'  # Nome do servidor SQL Server
    database = 'master'   # Nome do banco de dados
    username = 'leo'      # Nome de usuário
    password = '12345'    # Senha

    # String de conexão
    connection_string = f"mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server"

    # Cria a engine de conexão com o SQL Server
    engine = create_engine(connection_string)

    # Lê o arquivo Parquet
    df_carga = pd.read_parquet(caminho_parquet_carga)

    # Função para limpar e converter os dados
    def limpar_e_converter_dados(df):
        # Substitui "Valor Discrepante" por NaN
        df.replace("Valor Discrepante", np.nan, inplace=True)
        
        # Converte colunas numéricas (que estão como string) para o tipo correto
        colunas_numericas = ['TEU', 'VLPesoCargaBruta', 'VLPesoCargaConteinerizada', 'Peso_liquido']
        for coluna in colunas_numericas:
            df[coluna] = pd.to_numeric(df[coluna], errors='coerce')  # Converte para numérico, inválidos viram NaN
        
        # Converte colunas de flags para inteiros (se forem strings)
        colunas_flags = ['FlagAutorizacao', 'FlagConteinerTamanho']
        for coluna in colunas_flags:
            if df[coluna].dtype == 'object':  # Verifica se a coluna é do tipo string
                df[coluna] = pd.to_numeric(df[coluna], errors='coerce').fillna(0).astype(int)
        
        # Converte coluna booleana
        if 'Carga_Conteinerizada' in df.columns:
            df['Carga_Conteinerizada'] = df['Carga_Conteinerizada'].astype(bool)
        
        return df

    # Limpa e converte os dados
    df_carga = limpar_e_converter_dados(df_carga)

    # Define o mapeamento de tipos de colunas para o SQL Server
    dtype_mapping = {
        'IDCarga': types.INTEGER(),
        'IDAtracacao': types.INTEGER(),
        'Origem': types.VARCHAR(length=100),
        'Destino': types.VARCHAR(length=100),
        'CDMercadoria': types.VARCHAR(length=50),
        'Tipo Operação da Carga': types.VARCHAR(length=100),
        'Carga Geral Acondicionamento': types.VARCHAR(length=100),
        'ConteinerEstado': types.VARCHAR(length=50),
        'Tipo Navegação': types.VARCHAR(length=100),
        'FlagAutorizacao': types.INTEGER(),
        'FlagCabotagem': types.INTEGER(),
        'FlagCabotagemMovimentacao': types.INTEGER(),
        'FlagConteinerTamanho': types.INTEGER(),
        'FlagLongoCurso': types.INTEGER(),
        'FlagMCOperacaoCarga': types.INTEGER(),
        'FlagOffshore': types.INTEGER(),
        'FlagTransporteViaInterioir': types.INTEGER(),
        'Percurso Transporte em vias Interiores': types.VARCHAR(length=100),
        'Percurso Transporte Interiores': types.VARCHAR(length=100),
        'STNaturezaCarga': types.VARCHAR(length=100),
        'STSH2': types.VARCHAR(length=50),
        'STSH4': types.VARCHAR(length=50),
        'Natureza da Carga': types.VARCHAR(length=100),
        'Sentido': types.VARCHAR(length=50),
        'TEU': types.FLOAT(),
        'QTCarga': types.INTEGER(),
        'VLPesoCargaBruta': types.FLOAT(),
        'Carga_Conteinerizada': types.INTEGER(),
        'VLPesoCargaConteinerizada': types.FLOAT(),
        'Peso_liquido': types.FLOAT()
    }

    # Nome da tabela no SQL Server
    table_name = 'carga_fato'

    # Cria a tabela no SQL Server (se não existir) e insere os dados
    try:
        # Conecta ao banco de dados
        with engine.connect() as connection:
            # Cria a tabela
            df_carga.head(0).to_sql(
                name=table_name,
                con=connection,
                if_exists='replace',
                index=False,
                dtype=dtype_mapping
            )
            print(f"Tabela '{table_name}' criada com sucesso.")

            # Insere os dados na tabela
            df_carga.to_sql(
                name=table_name,
                con=connection,
                if_exists='append',
                index=False
            )
            print(f"Dados inseridos na tabela '{table_name}' com sucesso.")
    except Exception as e:
        print(f"Erro ao inserir dados no SQL Server: {e}")


# Definição do DAG
with DAG(
    "dag_antaq_etl",
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "email_on_failure": True,
        "email": ["seuemail@dominio.com"],
    },
    schedule_interval="@monthly",
    start_date=days_ago(1),
    catchup=False,
) as dag:

    extract_task = PythonOperator(
        task_id="extract_data",
        python_callable=extract_data,
    )

    transform_task = PythonOperator(
        task_id="transform_data",
        python_callable=transform_data,
    )

    load_task = PythonOperator(
        task_id="load_data",
        python_callable=load_data,
    )

    load_table_a_task = PythonOperator(
        task_id="load_table_a",
        python_callable=load_table_a,
    )

    load_table_c_task = PythonOperator(
        task_id="load_table_c",
        python_callable=load_table_c,
    )

    notify_success = EmailOperator(
        task_id="notify_success",
        to="leonardo_rocha18@hotmail.com",
        subject="ETL Concluído com Sucesso",
        html_content="O processo ETL da ANTAQ foi concluído com sucesso!"
    )

    # Definição da ordem de execução das tarefas
    extract_task >> transform_task >> load_task >> [load_table_a_task, load_table_c_task] >> notify_success