In [None]:
import os
import psycopg2
import csv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, to_date, dayofweek, row_number
from pyspark.sql.window import Window

In [None]:
# inicialização do etl, criando o banco, tabelas, spark, processando os arquivos e executando

class DataImporter:
    def __init__(self, db_config, arquivos, driver_jar):
        self.db_name = db_config["dbname"]
        self.db_user = db_config["user"]
        self.db_password = db_config["password"]
        self.db_host = db_config["host"]
        self.db_port = db_config["port"]
        self.arquivos = arquivos
        self.driver_jar = driver_jar
        self.spark = None

    def criar_banco(self):
        try:
            conn = psycopg2.connect(
                dbname="postgres",
                user=self.db_user,
                password=self.db_password,
                host=self.db_host
            )
            conn.autocommit = True
            cur = conn.cursor()
            cur.execute(f"SELECT 1 FROM pg_database WHERE datname = '{self.db_name}'")
            if not cur.fetchone():
                cur.execute(f"CREATE DATABASE {self.db_name}")
                print(f"🟢 Banco '{self.db_name}' criado.")
            else:
                print(f"🟡 Banco '{self.db_name}' já existe.")
            cur.close()
            conn.close()
        except Exception as e:
            print(f"❌ Erro ao criar banco: {e}")

    def iniciar_spark(self):
        self.spark = SparkSession.builder \
            .appName("Importar Arquivos para PostgreSQL") \
            .config("spark.jars", self.driver_jar) \
            .getOrCreate()

    def mapear_tipo_spark_para_postgres(self, spark_type):
        mapping = {
            "string": "TEXT",
            "int": "INTEGER",
            "bigint": "BIGINT",
            "double": "DOUBLE PRECISION",
            "float": "REAL",
            "boolean": "BOOLEAN",
            "timestamp": "TIMESTAMP",
            "date": "DATE"
        }
        return mapping.get(spark_type.lower(), "TEXT")

    def criar_tabela(self, tabela, df):
        try:
            conn = psycopg2.connect(
                dbname=self.db_name,
                user=self.db_user,
                password=self.db_password,
                host=self.db_host
            )
            cur = conn.cursor()
            schema = df.schema
            colunas_sql = []
            for field in schema.fields:
                tipo_sql = self.mapear_tipo_spark_para_postgres(field.dataType.simpleString())
                colunas_sql.append(f'"{field.name}" {tipo_sql}')
            sql_create = f'CREATE TABLE IF NOT EXISTS "{tabela}" ({", ".join(colunas_sql)});'
            cur.execute(sql_create)
            conn.commit()
            cur.close()
            conn.close()
            print(f"🟢 Tabela '{tabela}' criada com sucesso.")
        except Exception as e:
            print(f"❌ Erro ao criar tabela '{tabela}': {e}")

    def salvar_no_postgres(self, df, tabela):
        url = f"jdbc:postgresql://{self.db_host}:{self.db_port}/{self.db_name}"
        props = {
            "user": self.db_user,
            "password": self.db_password,
            "driver": "org.postgresql.Driver"
        }
        df.write.jdbc(url=url, table=tabela, mode="append", properties=props)
        print(f"✅ Dados salvos na tabela '{tabela}'.")

    def ajustar_tipos(self, df, tabela):
        if tabela == "clientes":
            df = df.withColumn("COD_ID_CLIENTE", col("COD_ID_CLIENTE").cast("int")) \
                   .withColumn("DAT_DATA_NASCIMENTO", to_date("DAT_DATA_NASCIMENTO", "yyyy-MM-dd"))
        elif tabela == "produtos":
            df = df.withColumn("COD_ID_PRODUTO", col("COD_ID_PRODUTO").cast("int")) \
                   .withColumn("COD_ID_CATEGORIA_PRODUTO", col("COD_ID_CATEGORIA_PRODUTO").cast("int")) \
                   .withColumn("COD_CODIGO_BARRAS", col("COD_CODIGO_BARRAS").cast("int"))
        elif tabela == "vendas":
            df = df.withColumn("_c0", col("_c0").cast("int")) \
                   .withColumn("COD_ID_LOJA", col("COD_ID_LOJA").cast("int")) \
                   .withColumn("COD_ID_CLIENTE", col("COD_ID_CLIENTE").cast("int")) \
                   .withColumn("NUM_ANOMESDIA", col("NUM_ANOMESDIA").cast("int")) \
                   .withColumn("COD_ID_VENDA_UNICO", col("COD_ID_VENDA_UNICO").cast("int")) \
                   .withColumn("COD_ID_PRODUTO", col("COD_ID_PRODUTO").cast("int")) \
                   .withColumn("VAL_VALOR_SEM_DESC", round(col("VAL_VALOR_SEM_DESC"), 2).cast("decimal(18,2)")) \
                   .withColumn("VAL_VALOR_DESCONTO", round(col("VAL_VALOR_DESCONTO"), 2).cast("decimal(18,2)")) \
                   .withColumn("VAL_VALOR_COM_DESC", round(col("VAL_VALOR_COM_DESC"), 2).cast("decimal(18,2)")) \
                   .withColumn("VAL_QUANTIDADE_KG", round(col("VAL_QUANTIDADE_KG"), 2).cast("decimal(18,2)"))
        return df

    def processar_arquivo(self, path, tabela):
        ext = os.path.splitext(path)[-1].lower()
        if ext == ".csv":
            df = self.spark.read.option("header", "true").option("delimiter", ";").csv(path)
        elif ext == ".json":
            df = self.spark.read.option("multiline", "true").json(path)
        else:
            print(f"⚠️ Tipo de arquivo não suportado: {path}")
            return

        # Ajustar tipos específicos
        df = self.ajustar_tipos(df, tabela)

        # Otimizar particionamento
        df = df.repartition(2)

        self.criar_tabela(tabela, df)
        self.salvar_no_postgres(df, tabela)

    def executar(self):
        self.criar_banco()
        self.iniciar_spark()
        for arquivo in self.arquivos:
            print(f"\n📁 Processando: {arquivo['path']}")
            self.processar_arquivo(arquivo["path"], arquivo["tabela"])
        self.spark.stop()


In [None]:
# Configurações e caminhos para execução
db_config = {
    "dbname": "bancopostgres",
    "user": "postgres",
    "password": "postgres",
    "host": "localhost",
    "port": "5432"
}

arquivos = [
    {"path": "/vendas.csv", "tabela": "vendas"},
    {"path": "/produtos.csv", "tabela": "produtos"},
    {"path": "/clientes.json", "tabela": "clientes"}
]

driver_jar="./postgresql-42.7.7.jar"

importador = DataImporter(db_config, arquivos, driver_jar)
importador.executar()


In [None]:
# Configuração do banco
db_config = {
    "dbname": "bancopostgres",
    "user": "postgres",
    "password": "postgres",
    "host": "localhost",
    "port": "5432"
}

# Cria diretório para salvar CSVs
output_dir = "./indicadores_sql_csv"
os.makedirs(output_dir, exist_ok=True)

# Queries com aspas duplas para nomes de colunas e tabelas
# a. Produtos mais vendidos.
# b. Clientes com mais compras.
# c. Quantidade de vendas por dia.
# d. Quantidade de produtos distintos vendidos por dia.
# e. Produtos que concederam maior desconto.
queries = {
    "produtos_mais_vendidos.csv": """
        SELECT v."COD_ID_PRODUTO", p."DES_PRODUTO", SUM(CAST(v."VAL_QUANTIDADE_KG" AS numeric)) AS total_vendido
        FROM vendas v
        LEFT JOIN produtos p ON v."COD_ID_PRODUTO" = p."COD_ID_PRODUTO"
        GROUP BY v."COD_ID_PRODUTO", p."DES_PRODUTO"
        ORDER BY total_vendido DESC;
    """,
    "clientes_mais_compras.csv": """
        SELECT v."COD_ID_CLIENTE", c."NOM_NOME", COUNT(DISTINCT v."COD_ID_VENDA_UNICO") AS total_compras
        FROM vendas v
        LEFT JOIN clientes c ON v."COD_ID_CLIENTE" = c."COD_ID_CLIENTE"
        GROUP BY v."COD_ID_CLIENTE", c."NOM_NOME"
        ORDER BY total_compras DESC;
    """,
    "vendas_por_dia.csv": """
        SELECT "NUM_ANOMESDIA", COUNT(DISTINCT "COD_ID_VENDA_UNICO") AS qtde_vendas
        FROM vendas
        GROUP BY "NUM_ANOMESDIA"
        ORDER BY "NUM_ANOMESDIA";
    """,
    "produtos_distintos_por_dia.csv": """
        SELECT "NUM_ANOMESDIA", COUNT(DISTINCT "COD_ID_PRODUTO") AS qtde_produtos_distintos
        FROM vendas
        GROUP BY "NUM_ANOMESDIA"
        ORDER BY "NUM_ANOMESDIA";
    """,
    "produtos_maior_desconto.csv": """
        SELECT v."COD_ID_PRODUTO", p."DES_PRODUTO", SUM(CAST(v."VAL_VALOR_DESCONTO" AS numeric)) AS total_desconto
        FROM vendas v
        LEFT JOIN produtos p ON v."COD_ID_PRODUTO" = p."COD_ID_PRODUTO"
        GROUP BY v."COD_ID_PRODUTO", p."DES_PRODUTO"
        ORDER BY total_desconto DESC;
    """
}

def exportar_csv(nome_arquivo, query):
    with psycopg2.connect(**db_config) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            colunas = [desc[0] for desc in cur.description]
            resultados = cur.fetchall()

    with open(os.path.join(output_dir, nome_arquivo), mode='w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(colunas)
        writer.writerows(resultados)

    print(f"✅ Exportado: {nome_arquivo}")

for arquivo, query in queries.items():
    exportar_csv(arquivo, query)


In [None]:
# 1. Inicializar Spark
spark = SparkSession.builder \
    .appName("PipelineUnificada") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.jars", "./postgresql-42.7.7.jar") \
    .getOrCreate()

# 2. Configurações do banco
db_config = {
    "url": "jdbc:postgresql://localhost:5432/bancopostgres",
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}

# 3. Função para ler tabelas
def ler_tabela(tabela):
    return spark.read.format("jdbc") \
        .option("url", db_config["url"]) \
        .option("dbtable", tabela) \
        .option("user", db_config["user"]) \
        .option("password", db_config["password"]) \
        .option("driver", db_config["driver"]) \
        .load()

# 4. Carregar tabelas
df_vendas = ler_tabela("vendas")
df_clientes = ler_tabela("clientes")
df_produtos = ler_tabela("produtos")

# ==================== AJUSTE CÓDIGO VENDA ====================
tamanho_cod_loja = 3
df_vendas = df_vendas.withColumn("COD_ID_LOJA_STR", col("COD_ID_LOJA").cast("string")) \
    .withColumn("prefixo_cod_venda", substring(col("COD_ID_VENDA_UNICO"), 1, tamanho_cod_loja)) \
    .withColumn("sufixo_cod_venda", substring(col("COD_ID_VENDA_UNICO"), tamanho_cod_loja + 1, 100)) \
    .withColumn("COD_ID_VENDA_UNICO",
        when(col("prefixo_cod_venda") != col("COD_ID_LOJA_STR"),
             concat(col("COD_ID_LOJA_STR"), col("sufixo_cod_venda")))
        .otherwise(col("COD_ID_VENDA_UNICO"))
    ) \
    .drop("prefixo_cod_venda", "sufixo_cod_venda", "COD_ID_LOJA_STR")

# ==================== MELHOR DIA DA SEMANA - TOP 20 CLIENTES ====================
# Converter NUM_ANOMESDIA (ex: 20230621) para data
df_vendas = df_vendas.withColumn("DATA_VENDA",
    to_date(col("NUM_ANOMESDIA").cast("string"), "yyyyMMdd")
)

# Top 20 clientes com mais compras
top_clientes = df_vendas.groupBy("COD_ID_CLIENTE") \
    .agg(count("COD_ID_VENDA_UNICO").alias("TOTAL_COMPRAS")) \
    .orderBy(desc("TOTAL_COMPRAS")) \
    .limit(20)

# Filtrar vendas dos top clientes e extrair dia da semana
df_top_vendas = df_vendas.join(top_clientes.select("COD_ID_CLIENTE"), on="COD_ID_CLIENTE", how="inner") \
    .withColumn("DIA_SEMANA", dayofweek(col("DATA_VENDA")))

# Contar vendas por cliente e dia da semana
compras_por_dia = df_top_vendas.groupBy("COD_ID_CLIENTE", "DIA_SEMANA") \
    .agg(count("COD_ID_VENDA_UNICO").alias("QTDE_VENDAS"))

# Ranking do melhor dia
window_cliente = Window.partitionBy("COD_ID_CLIENTE").orderBy(desc("QTDE_VENDAS"))
melhor_dia_cliente = compras_por_dia.withColumn("rank", row_number().over(window_cliente)) \
    .filter(col("rank") == 1) \
    .drop("rank")

# Juntar com nomes
resultado_melhor_dia = melhor_dia_cliente.join(df_clientes.select("COD_ID_CLIENTE", "NOM_NOME"),
                                               on="COD_ID_CLIENTE", how="left") \
    .orderBy(desc("QTDE_VENDAS"))

# ==================== VALIDAÇÃO CHAVES ESTRANGEIRAS ====================
clientes_faltando = df_vendas.select("COD_ID_CLIENTE").distinct() \
    .join(df_clientes.select("COD_ID_CLIENTE").distinct(),
          on="COD_ID_CLIENTE", how="left_anti") \
    .orderBy("COD_ID_CLIENTE")

produtos_faltando = df_vendas.select("COD_ID_PRODUTO").distinct() \
    .join(df_produtos.select("COD_ID_PRODUTO").distinct(),
          on="COD_ID_PRODUTO", how="left_anti") \
    .orderBy("COD_ID_PRODUTO")

# ==================== SALVAR RESULTADOS ====================
output_dir = "./indicadores_csv"
os.makedirs(output_dir, exist_ok=True)

resultado_melhor_dia.write.mode("overwrite").csv(os.path.join(output_dir, "melhor_dia_semana_top20_clientes.csv"), header=True)
clientes_faltando.write.mode("overwrite").csv(os.path.join(output_dir, "clientes_faltando.csv"), header=True)
produtos_faltando.write.mode("overwrite").csv(os.path.join(output_dir, "produtos_faltando.csv"), header=True)

print(f"✅ Resultados exportados para: {output_dir}")

# ==================== FINALIZAR ====================
spark.stop()

In [None]:
# --- script para fazer um replica do bancopostgres no mongoDB ---

# URI de conexão para o MongoDB
# 'localhost:27017' é o endereço padrão para uma instância local do MongoDB
mongo_uri = "mongodb://localhost:27017"

# Nome do banco de dados no MongoDB onde os dados serão escritos
mongo_db = "replica_bancomongo"

# URL de conexão para o PostgreSQL
# 'localhost:5432' é a porta padrão do PostgreSQL, 'bancopostgres' é o nome do banco de dados
pg_url = "jdbc:postgresql://localhost:5432/bancopostgres"

# Credenciais de usuário e senha para o PostgreSQL
pg_user = "postgres"
pg_password = "postgres"

# Nome do driver JDBC para PostgreSQL
pg_driver = "org.postgresql.Driver"

# Caminho para o arquivo JAR do driver JDBC PostgreSQL.
# O conector MongoDB Spark será baixado automaticamente via 'spark.jars.packages'.
postgresql_driver_jar = "/postgresql-42.7.7.jar"

# --- Inicialização da SparkSession ---

spark = SparkSession.builder \
    .appName("ReplicaPostgresMongo") \
    .config("spark.jars", postgresql_driver_jar) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.input.uri", mongo_uri) \
    .config("spark.mongodb.output.uri", mongo_uri) \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()


print("SparkSession inicializada com sucesso!")

# --- Função Auxiliar para Replicar Tabelas ---
def replicate_table(spark_session, table_name, mongo_db_name, mongo_collection_name):

    print(f"\n--- Replicando Tabela: {table_name} ---")
    print(f"Lendo dados da tabela '{table_name}' do PostgreSQL em: {pg_url}")

    try:
        # Lê os dados da tabela do PostgreSQL usando o formato JDBC.
        df = spark_session.read \
            .format("jdbc") \
            .option("url", pg_url) \
            .option("dbtable", table_name) \
            .option("user", pg_user) \
            .option("password", pg_password) \
            .option("driver", pg_driver) \
            .load()

        print(f"Dados da tabela '{table_name}' do PostgreSQL lidos com sucesso. Exemplo de esquema:")
        df.printSchema()

        if table_name != "vendas":
            print(f"Exemplo de 5 linhas dos dados lidos da tabela '{table_name}':")
            df.show(5)
        else:
            print(f"A tabela '{table_name}' pode ser grande. Não exibindo as primeiras 5 linhas para evitar problemas de memória.")


        print(f"Escrevendo dados no MongoDB na coleção '{mongo_collection_name}' do banco de dados '{mongo_db_name}'...")

        df.write \
            .format("com.mongodb.spark.sql.DefaultSource") \
            .mode("overwrite") \
            .option("database", mongo_db_name) \
            .option("collection", mongo_collection_name) \
            .save()

        print(f"Dados da tabela '{table_name}' replicados para o MongoDB com sucesso na coleção '{mongo_collection_name}'!")

    except Exception as e:
        print(f"Ocorreu um erro durante a replicação da tabela '{table_name}': {e}")
        raise

# --- Chamadas da Função de Replicação para cada Tabela ---
try:
    # Replicar tabela 'clientes'
    replicate_table(spark, "clientes", mongo_db, "clientes")

    # Replicar tabela 'produtos'
    replicate_table(spark, "produtos", mongo_db, "produtos")

    # Replicar tabela 'vendas'
    replicate_table(spark, "vendas", mongo_db, "vendas")

except Exception as e:
    print(f"\nUm erro geral ocorreu durante a execução do script: {e}")
finally:
    spark.stop()
    print("\nSparkSession encerrada.")

