In [1]:
pip install pyspark python-decouple pymongo

Note: you may need to restart the kernel to use updated packages.


In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, upper, last, monotonically_increasing_id, udf
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from decouple import config

from mongo.connection import DBconnectionHandler
from mongo.MongoDBLoaderFromS3 import MongoDBLoaderFromS3
from decouple import config

import sys

sys.path.insert(0, '/home/user/project')

spark = SparkSession.builder \
    .appName("Spark_S3_dados") \
    .config("spark.hadoop.fs.s3a.access.key", f"{config('AWS_ACCESS_KEY')}") \
    .config("spark.hadoop.fs.s3a.secret.key", f"{config('AWS_SECRET_KEY')}") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Inicia a conexão com MongoDB
mongo_handler = DBconnectionHandler()
mongo_handler.connect_to_db()
db = mongo_handler.get_db_connection()

In [3]:
def limpar_nomes_colunas(df):
    """
    Remove caracteres especiais dos nomes das colunas
    Exemplo: 'Quantidade (L.)' -> 'Quantidade_L'
    """
    mapa_renomear = {
        "Quantidade (L.)": "Quantidade_L",
        "Quantidade (Kg)": "Quantidade_Kg",
        "Quantidade (KG)": "Quantidade_Kg",
        "Valor (US$)": "Valor_US",
        "Quantidade (L.)": "Quantidade_L",
    }
    
    for col_name in df.columns:
        if col_name in mapa_renomear:
            novo_nome = mapa_renomear[col_name]
            df = df.withColumnRenamed(col_name, novo_nome)
        else:
            # Para outros caracteres especiais
            novo_nome = col_name.replace(" ", "_").replace("(", "").replace(")", "").replace(".", "").replace("$", "")
            if col_name != novo_nome:
                df = df.withColumnRenamed(col_name, novo_nome)
    
    return df

In [4]:
def corrigir_caracteres_corrompidos(df, colunas):
    """
    Corrige caracteres corrompidos (como 'Espumante OrgÃ¢nico') em colunas de um DataFrame Spark.

    Args:
        df: DataFrame Spark
        colunas: Lista de nomes de colunas que precisam de correção

    Returns:
        DataFrame Spark com as colunas corrigidas
    """
    # Mapeamento de caracteres corrompidos para corretos
    mapa_caracteres = {
        "Ã¢": "â",
        "Ã©": "é",
        "Ã§": "ç",
        "Ã³": "ó",
        "Ãª": "ê",
        "Ã ": "à",
        "Ãº": "ú",
        "Â°": "°",
        "Ã£": "ã", # adicionar mais conforme encontrar
    }

    def corrigir_texto(texto):
        if texto is None:
            return None
        for k, v in mapa_caracteres.items():
            texto = texto.replace(k, v)
        return texto

    # Criar UDF Spark
    corrigir_udf = udf(corrigir_texto, StringType())

    # Aplicar UDF em todas as colunas desejadas
    for coluna in colunas:
        df = df.withColumn(coluna, corrigir_udf(col(coluna)))

    return df

In [5]:
def remover_virgulas_e_pontos(df, colunas):
    """
    Remove vírgulas e pontos de colunas de texto em um DataFrame Spark.

    Args:
        df: DataFrame Spark
        colunas: Lista de nomes de colunas que precisam da limpeza

    Returns:
        DataFrame Spark com as colunas limpas
    """
    def limpar_texto(texto):
        if texto is None:
            return None
        return texto.replace(",", "").replace(".", "").replace("-", " ").replace("/", " ")
    
    limpar_udf = udf(limpar_texto, StringType())
    
    for coluna in colunas:
        df = df.withColumn(coluna, limpar_udf(col(coluna)))
    
    return df

In [6]:
def processar_arquivo(caminho_s3, colunas_categorias=None, coluna_quantidade=None):
    """
    Processa arquivo parquet com detecção e limpeza de dados
    
    Args:
        caminho_s3: Caminho completo do arquivo no S3
        colunas_categorias: Lista de colunas que contêm categorias (headers em maiúsculas)
        coluna_quantidade: Nome da coluna de quantidade para substituir "-" por 0 (usa nome APÓS limpeza)
    """
    df = spark.read.parquet(caminho_s3)
    
    # Limpar nomes das colunas
    df = limpar_nomes_colunas(df)
    
    # Se houver colunas de quantidade, tratar "-" como 0
    if coluna_quantidade:
        if isinstance(coluna_quantidade, list):
            for col_qty in coluna_quantidade:
                df = df.withColumn(
                    col_qty,
                    when(col(col_qty) == "-", 0).otherwise(col(col_qty))
                )
        else:
            df = df.withColumn(
                coluna_quantidade,
                when(col(coluna_quantidade) == "-", 0).otherwise(col(coluna_quantidade))
            )
    
    return df

In [7]:
def processar_com_categorias(df, coluna_categoria, coluna_nome_item, colunas_quantidade):
    """
    Identifica categorias (linhas em MAIÚSCULAS) e propaga para itens abaixo
    
    Args:
        df: DataFrame do Spark
        coluna_categoria: Coluna que contém as categorias
        coluna_nome_item: Coluna que contém nomes de itens
        colunas_quantidade: Lista de colunas de quantidade para limpar
    """
    # Criar coluna temporária para identificar categorias
    df = df.withColumn(
        "categoria_temp",
        when(col(coluna_categoria) == upper(col(coluna_categoria)), col(coluna_categoria))
    )
    
    # Criar índice para preservar ordem
    df = df.withColumn("idx", monotonically_increasing_id())
    
    # Janela para forward fill
    janela = Window.orderBy("idx").rowsBetween(Window.unboundedPreceding, 0)
    df = df.withColumn("Categoria", last("categoria_temp", ignorenulls=True).over(janela))
    
    # Filtrar: remover linhas que são cabeçalhos (maiúsculas)
    df_detalhes = df.filter(~(col(coluna_categoria) == upper(col(coluna_categoria))))
    
    # Remover linha com "Total" se existir
    df_detalhes = df_detalhes.filter(col(coluna_categoria) != "Total")
    
    # Remover colunas auxiliares
    df_final = df_detalhes.drop("categoria_temp", "idx")
    
    return df_final

In [8]:
print("=" * 60)
print("PROCESSANDO: Produção de Vinhos/Sucos/Derivados")
print("=" * 60)

df1 = processar_arquivo(
    f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/bronze/table_Producao_de_vinhos_sucos_e_derivados.parquet",
    coluna_quantidade=["Quantidade_L"]
)

df1 = remover_virgulas_e_pontos(df1, colunas=["Produto"])

df1_final = processar_com_categorias(
    df1,
    coluna_categoria="Produto",
    coluna_nome_item="Produto",
    colunas_quantidade=["Quantidade_L"]
)

print("\nPrimeiras linhas do resultado:")
df1_final.show(47, truncate=False)
print(f"Total de registros: {df1_final.count()}")

PROCESSANDO: Produção de Vinhos/Sucos/Derivados

Primeiras linhas do resultado:
+----+----------------------------------------+------------+-----------------------------+
|Ano |Produto                                 |Quantidade_L|Categoria                    |
+----+----------------------------------------+------------+-----------------------------+
|2024|Tinto                                   |86.404.980  |VINHO DE MESA                |
|2024|Branco                                  |14.682.722  |VINHO DE MESA                |
|2024|Rosado                                  |1.543.578   |VINHO DE MESA                |
|2024|Tinto                                   |10.594.848  |VINHO FINO DE MESA (VINIFERA)|
|2024|Branco                                  |10.780.605  |VINHO FINO DE MESA (VINIFERA)|
|2024|Rosado                                  |1.170.128   |VINHO FINO DE MESA (VINIFERA)|
|2024|Suco de uva integral                    |66.326.591  |SUCO                         |
|2024|Suco

In [9]:
print("\n" + "=" * 60)
print("PROCESSANDO: Exportação de Suco de Uva")
print("=" * 60)

df2 = processar_arquivo(
    f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/bronze/table_Exportacao_de_suco_de_uva.parquet",
    coluna_quantidade=["Quantidade_Kg", "Valor_US"]
)

df2 = remover_virgulas_e_pontos(df2, colunas=["Países"])

# Este arquivo não tem categorias, apenas lista de países
print("\nDados processados:")
df2.show(20, truncate=False)
print(f"Total de registros: {df2.count()}")


PROCESSANDO: Exportação de Suco de Uva

Dados processados:
+----+--------------+------------------------------+-------------+--------+
|Ano |Tipo          |Países                        |Quantidade_Kg|Valor_US|
+----+--------------+------------------------------+-------------+--------+
|2024|Vinhos de mesa|Afeganistão                   |0            |0       |
|2024|Vinhos de mesa|África do Sul                 |103          |1.783   |
|2024|Vinhos de mesa|Alemanha República Democrática|6.666        |48.095  |
|2024|Vinhos de mesa|Angola                        |0            |0       |
|2024|Vinhos de mesa|Anguilla                      |0            |0       |
|2024|Vinhos de mesa|Antígua e Barbuda             |447          |3.329   |
|2024|Vinhos de mesa|Antilhas Holandesas           |0            |0       |
|2024|Vinhos de mesa|Arábia Saudita                |32           |54      |
|2024|Vinhos de mesa|Argélia                       |6            |87      |
|2024|Vinhos de mesa|Argenti

In [10]:
print("\n" + "=" * 60)
print("PROCESSANDO: Importação de Suco de Uva")
print("=" * 60)

df3 = processar_arquivo(
    f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/bronze/table_Importacao_de_suco_de_uva.parquet",
    coluna_quantidade=["Quantidade_Kg", "Valor_US"]
)

df3 = remover_virgulas_e_pontos(df3, colunas=["Países"])

print("\nDados processados:")
df3.show(20, truncate=False)
print(f"Total de registros: {df3.count()}")


PROCESSANDO: Importação de Suco de Uva

Dados processados:
+----+--------------+-----------------------+-------------+-----------+
|Ano |Tipo          |Países                 |Quantidade_Kg|Valor_US   |
+----+--------------+-----------------------+-------------+-----------+
|2024|Vinhos de mesa|Africa do Sul          |658.238      |2.133.775  |
|2024|Vinhos de mesa|Alemanha               |121.002      |805.466    |
|2024|Vinhos de mesa|Argélia                |0            |0          |
|2024|Vinhos de mesa|Arábia Saudita         |0            |0          |
|2024|Vinhos de mesa|Argentina              |26.272.478   |93.869.579 |
|2024|Vinhos de mesa|Armênia                |0            |0          |
|2024|Vinhos de mesa|Austrália              |422.720      |1.437.842  |
|2024|Vinhos de mesa|Áustria                |17.796       |104.965    |
|2024|Vinhos de mesa|Bermudas               |0            |0          |
|2024|Vinhos de mesa|Bélgica                |0            |0          |
|202

In [11]:
print("\n" + "=" * 60)
print("PROCESSANDO: Comercialização de Vinhos e Derivados")
print("=" * 60)

# Carregar arquivo Parquet
df4 = processar_arquivo(
    f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/bronze/table_Comercializacao_de_vinhos_e_derivados.parquet",
    coluna_quantidade="Quantidade_L"
)

# Corrigir caracteres corrompidos nas colunas necessárias
df4 = corrigir_caracteres_corrompidos(df4, colunas=["Produto"])  # adicione outras colunas se precisar

df4 = remover_virgulas_e_pontos(df4, colunas=["Produto"])

# Processar categorias
df4_final = processar_com_categorias(
    df4,
    coluna_categoria="Produto",
    coluna_nome_item="Produto",
    colunas_quantidade=["Quantidade_L"]
)

print("\nDados processados:")
df4_final.show(20, truncate=False)
print(f"Total de registros: {df4_final.count()}")


PROCESSANDO: Comercialização de Vinhos e Derivados

Dados processados:
+----+----------------------------------------+------------+-------------------------------+
|Ano |Produto                                 |Quantidade_L|Categoria                      |
+----+----------------------------------------+------------+-------------------------------+
|2024|Tinto                                   |0           |VINHO DE MESA                  |
|2024|Rosado                                  |0           |VINHO DE MESA                  |
|2024|Branco                                  |0           |VINHO DE MESA                  |
|2024|Tinto                                   |0           |VINHO FINO DE MESA             |
|2024|Rosado                                  |0           |VINHO FINO DE MESA             |
|2024|Branco                                  |0           |VINHO FINO DE MESA             |
|2024|Tinto                                   |0           |VINHO ESPECIAL                 

In [12]:
print("\n" + "=" * 60)
print("PROCESSANDO: Processamento de uvas")
print("=" * 60)

df5 = processar_arquivo(
    f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/bronze/table_Uvas_sem_classificacao_processadas.parquet",
    coluna_quantidade="Quantidade_Kg"
)

df5 = remover_virgulas_e_pontos(df5, colunas=["Cultivar"])

df5_final = processar_com_categorias(
    df5,
    coluna_categoria="Cultivar",
    coluna_nome_item="Cultivar",
    colunas_quantidade=["Quantidade_Kg"]
)

print("\nDados processados:")
df5_final.show(20, truncate=False)
print(f"Total de registros: {df5_final.count()}")


PROCESSANDO: Processamento de uvas

Dados processados:
+----+---------+------------------+-------------+---------+
|Ano |Tipo     |Cultivar          |Quantidade_Kg|Categoria|
+----+---------+------------------+-------------+---------+
|2024|Viníferas|Alicante Bouschet |2.302.750    |TINTAS   |
|2024|Viníferas|Ancelota          |374.633      |TINTAS   |
|2024|Viníferas|Aramon            |0            |TINTAS   |
|2024|Viníferas|Alfrocheiro       |0            |TINTAS   |
|2024|Viníferas|Arinarnoa         |40.924       |TINTAS   |
|2024|Viníferas|Aspirant Bouschet |115.469      |TINTAS   |
|2024|Viníferas|Barbera           |12.666       |TINTAS   |
|2024|Viníferas|Bonarda           |2.359        |TINTAS   |
|2024|Viníferas|Cabernet Franc    |1.622.225    |TINTAS   |
|2024|Viníferas|Cabernet Sauvignon|3.321.069    |TINTAS   |
|2024|Viníferas|Caladoc           |2.714        |TINTAS   |
|2024|Viníferas|Campanario        |0            |TINTAS   |
|2024|Viníferas|Canaiolo          |0        

In [13]:
print("\n" + "=" * 60)
print("SALVANDO RESULTADOS")
print("=" * 60)

try:
    df1_final.write.mode("overwrite").parquet(f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/producao_vinhos/")
    print("✓ Produção de vinhos salvo")
except Exception as e:
    print(f"✗ Erro ao salvar produção: {e}")

try:
    df2.write.mode("overwrite").parquet(f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/exportacao_suco/")
    print("✓ Exportação de suco salvo")
except Exception as e:
    print(f"✗ Erro ao salvar exportação: {e}")

try:
    df3.write.mode("overwrite").parquet(f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/importacao_suco/")
    print("✓ Importação de suco salvo")
except Exception as e:
    print(f"✗ Erro ao salvar importação: {e}")

try:
    df4_final.write.mode("overwrite").parquet(f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/comercializacao_vinhos/")
    print("✓ Comercialização de vinhos salvo")
except Exception as e:
    print(f"✗ Erro ao salvar comercialização: {e}")

try:
    df5_final.write.mode("overwrite").parquet(f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/uvas_processadas/")
    print("✓ Uvas processadas salvo")
except Exception as e:
    print(f"✗ Erro ao salvar uvas: {e}")

print("\n✓ Processamento concluído!")


SALVANDO RESULTADOS
✓ Produção de vinhos salvo
✓ Exportação de suco salvo
✓ Importação de suco salvo
✓ Comercialização de vinhos salvo
✓ Uvas processadas salvo

✓ Processamento concluído!


In [14]:
# Função para enviar DataFrame Spark para MongoDB
def save_df_to_mongo(df, collection_name):
    # Converte DataFrame Spark em lista de dicts
    data = [row.asDict() for row in df.collect()]
    
    if data:
        collection = db[collection_name]
        # Apaga a coleção se já existir (opcional)
        collection.drop()
        collection.insert_many(data)
        print(f"✓ Coleção '{collection_name}' salva no MongoDB")
    else:
        print(f"⚠ DataFrame '{collection_name}' está vazio, nada foi salvo")

# Dicionário com os paths S3 e nomes das coleções
s3_collections = {
    "producao_vinhos": f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/producao_vinhos/",
    "exportacao_suco": f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/exportacao_suco/",
    "importacao_suco": f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/importacao_suco/",
    "comercializacao_vinhos": f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/comercializacao_vinhos/",
    "uvas_processadas": f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/uvas_processadas/"
}

# Loop para ler cada Parquet do S3 e enviar para MongoDB
for collection_name, s3_path in s3_collections.items():
    try:
        df_spark = spark.read.parquet(s3_path)
        save_df_to_mongo(df_spark, collection_name)
    except Exception as e:
        print(f"✗ Erro ao processar {collection_name}: {e}")

print("\n✓ Upload para MongoDB concluído!")


✓ Coleção 'producao_vinhos' salva no MongoDB
✓ Coleção 'exportacao_suco' salva no MongoDB
✓ Coleção 'importacao_suco' salva no MongoDB
✓ Coleção 'comercializacao_vinhos' salva no MongoDB
✓ Coleção 'uvas_processadas' salva no MongoDB

✓ Upload para MongoDB concluído!


In [18]:
# Criar instância do loader
loader = MongoDBLoaderFromS3(spark, mongo_handler)

# Definir datasets para carregar
datasets = [
    {
        's3_path': f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/producao_vinhos/",
        'collection_name': 'producao_vinhos'
    },
    {
        's3_path': f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/exportacao_suco/",
        'collection_name': 'exportacao_suco'
    },
    {
        's3_path': f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/importacao_suco/",
        'collection_name': 'importacao_suco'
    },
    {
        's3_path': f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/comercializacao_vinhos/",
        'collection_name': 'comercializacao_vinhos'
    },
    {
        's3_path': f"s3a://{config('AWS_DESTINY_KEY')}/embrapa-api/silver/uvas_processadas/",
        'collection_name': 'uvas_processadas'
    }
]

print("\n" + "=" * 60)
print("CARREGANDO DADOS PARA MONGODB")
print("=" * 60)

results = loader.load_multiple_datasets(datasets)

for result in results:
    if result['status'] == 'sucesso':
        print(f"✓ {result['collection']}: {result['records_inserted']} registros inseridos")
    else:
        print(f"✗ {result['collection']}: {result['erro']}")

print("\n✓ Processamento concluído!")


INFO:mongo.MongoDBLoaderFromS3:Lendo arquivo Parquet de s3a://



CARREGANDO DADOS PARA MONGODB


INFO:mongo.MongoDBLoaderFromS3:Total de registros lidos: 2585
INFO:mongo.MongoDBLoaderFromS3:Coleção 'producao_vinhos' preparada (dados anteriores removidos)
INFO:mongo.MongoDBLoaderFromS3:Inseridos 2585/2585 registros
INFO:mongo.MongoDBLoaderFromS3:✓ 2585 registros inseridos em 'producao_vinhos'
INFO:mongo.MongoDBLoaderFromS3:Lendo arquivo Parquet de s3a://
INFO:mongo.MongoDBLoaderFromS3:Total de registros lidos: 28435
INFO:mongo.MongoDBLoaderFromS3:Coleção 'exportacao_suco' preparada (dados anteriores removidos)
INFO:mongo.MongoDBLoaderFromS3:Inseridos 10000/28435 registros
INFO:mongo.MongoDBLoaderFromS3:Inseridos 20000/28435 registros
INFO:mongo.MongoDBLoaderFromS3:Inseridos 28435/28435 registros
INFO:mongo.MongoDBLoaderFromS3:✓ 28435 registros inseridos em 'exportacao_suco'
INFO:mongo.MongoDBLoaderFromS3:Lendo arquivo Parquet de s3a://
INFO:mongo.MongoDBLoaderFromS3:Total de registros lidos: 11275
INFO:mongo.MongoDBLoaderFromS3:Coleção 'importacao_suco' preparada (dados anteriores 

✓ producao_vinhos: 2585 registros inseridos
✓ exportacao_suco: 28435 registros inseridos
✓ importacao_suco: 11275 registros inseridos
✓ comercializacao_vinhos: 2915 registros inseridos
✓ uvas_processadas: 11495 registros inseridos

✓ Processamento concluído!
