# Instanciar Spark

In [1]:
# Importações do PySpark e SQLalchemy
import pyspark.sql.functions as f
from pyspark.sql.functions import col, from_json, explode, expr, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy.dialects.postgresql import insert
import psycopg2

# Configuração da sessão Spark
spark_session = SparkSession.builder \
    .appName("Ibge_silver") \
    .config("spark.master", "local[*]") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.1") \
    .config("spark.executor.instances", "3") \
    .getOrCreate()

# PANDAS 


In [2]:
def call_upsert(table, conn, keys, data_iter):
    """
    Função para atualizar os dados na tabela usando a lógica de upsert.

    Args:
        table (sqlalchemy.Table): Tabela alvo para upsert.
        conn (sqlalchemy.engine.Connection): Conexão com o banco de dados.
        keys (list): Lista das chaves da tabela.
        data_iter (iterable): Iterável contendo dados a serem inseridos/atualizados.

    Returns:
        None

    Example:
        keys = ['id', 'name']
        data_iter = [(1, 'John'), (2, 'Jane')]

        # Tabela alvo 'users' com colunas 'id' e 'name'
        users_table = sqlalchemy.Table('users', metadata, autoload_with=engine)

        # Conexão com o banco de dados
        db_conn = engine.connect()

        # Chamada à função de upsert
        call_upsert(users_table, db_conn, keys, data_iter)

        # Fechar a conexão
        db_conn.close()
    """
    data = [dict(zip(keys, row)) for row in data_iter]

    # Construir a declaração de inserção
    insert_statement = insert(table).values(data)

    # Construir a declaração de upsert
    upsert_statement = insert_statement.on_conflict_do_update(
        constraint=f"{table.name}_pkey",
        set_={c.key: c for c in insert_statement.excluded},
    )

    # Executar a declaração de upsert
    conn.execute(upsert_statement)


# SQL CONNECTION

In [3]:
# Configurações de conexão com o PostgreSQL
# Este item é totalmente editável, para uma conexão de sua preferência (Vamos fazer conforme o docker-compose que subimos)

host = 'localhost'
port = '8085'

bronze_url = f"jdbc:postgresql://{host}:{port}/bronze_data"
silver_url = f"jdbc:postgresql://{host}:{port}/silver_data"
properties = {"user": "ibge", "password": "ibge", "driver": "org.postgresql.Driver"}

# Criar um engine do SQLAlchemy
bronze_engine = create_engine(f"postgresql://{properties['user']}:{properties['password']}@{host}:{port}/bronze_data")
silver_engine = create_engine(f"postgresql://{properties['user']}:{properties['password']}@{host}:{port}/silver_data")




# REIGAO

In [4]:
# INSERÇÃO DA DIMENSÃO REGIÃO USANDO PANDAS

# Definir a consulta SQL
query = 'SELECT id_regiao,  regiao FROM public.regiao'


# Upinset dos dados via pandas a cada 10000 linhas
for data_frame in pd.read_sql(query,bronze_engine,chunksize=10000):
    try:
        pd.DataFrame(data_frame).to_sql(
            'regiao', 
            silver_engine, 
            schema='public', 
            index=False, 
            if_exists="append", 
            method=call_upsert
        )
        print("UpInsert executado")
    except Exception as e :
        print(f'Exceção {str(e)}')            

Exceção subject table for an INSERT, UPDATE or DELETE expected, got <pandas.io.sql.SQLTable object at 0x000001E6CBC76000>.


In [5]:
# INSERÇÃO DA DIMENSÃO ESTADO USANDO PANDAS

# Definir a consulta SQL
query = """
select
	id_estado,
	uf,
	estado,
	1 as latitude,
	1 as longitude,
	id_regiao as id_regiao
from
	public.estado"""

# Upinset dos dados via pandas a cada 10000 linhas
for data_frame in pd.read_sql(query,bronze_engine,chunksize=10000):
    try:
        pd.DataFrame(data_frame).to_sql(
            'estado', 
            silver_engine, 
            schema='public', 
            index=False, 
            if_exists="append", 
            method=call_upsert
        )
        print("UpInsert executado")
    except Exception as e :
        print(f'Exceção {str(e)}')      

Exceção subject table for an INSERT, UPDATE or DELETE expected, got <pandas.io.sql.SQLTable object at 0x000001E6CA3AF6B0>.


In [6]:
# INSERÇÃO DA DIMENSÃO ESTADO USANDO PANDAS


# Definir a consulta SQL
sql_municipio = """
    select
	m.*,
	case when c.capital is not null then true else false end as flag_capital
from
	public.capitais c
right join
	municipios m 
	on
	c.uf = m.uf
	and c.capital = m.municipio
"""

# Upinset dos dados via pandas a cada 10000 linhas
for data_frame in pd.read_sql(sql_municipio,bronze_engine,chunksize=10000):
    try:
        pd.DataFrame(data_frame).to_sql(
            'municipio', 
            silver_engine, 
            schema='public', 
            index=False, 
            if_exists="append", 
            method=call_upsert
        )
        print("UpInsert executado")
    except Exception as e :
        print(f'Exceção {str(e)}')   


Exceção subject table for an INSERT, UPDATE or DELETE expected, got <pandas.io.sql.SQLTable object at 0x000001E6CBC77470>.


# PESQUISAS (leitura)

In [7]:
# IDH

# leitura dos dados via odbc
df_pesquisas_idh = spark_session.read.format("jdbc") \
    .option("url", bronze_url) \
    .option("driver", properties["driver"]) \
    .option("dbtable", "pesquisas_idh") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .load()

# Definir a consulta SQL
sql_query = """
    select
	e.id_estado ,
	m.id_municipio as id_capital
from
	public.capitais c
inner join
	municipios m 
on
	c.uf = m.uf
	and c.capital = m.municipio
inner join
	public.estado e 
on
	c.uf  = e.uf 
"""

# Ler os dados do banco de dados usando Spark
df_estado_municipio = spark_session.read.jdbc(url=bronze_url, table=f"({sql_query}) as subquery", properties=properties)


# Realize o left join
df_idh_final = df_pesquisas_idh.join(
    df_estado_municipio,
    df_pesquisas_idh["cd_estado"] == df_estado_municipio["id_estado"],
    how="left"
)

# seleciona os dados aplicando os tipos
df_idh_final = df_idh_final.select(
    col('cd_pesquisa').astype(StringType()),
    col('id_capital').alias('municipio').astype(IntegerType()),
    col('cd_estado').astype(IntegerType()),
    col('ano').astype(IntegerType()),
    col('valor').astype(StringType())
)

# Mostrar o DataFrame resultante
df_idh_final.show()


+-----------+---------+---------+----+-----+
|cd_pesquisa|municipio|cd_estado| ano|valor|
+-----------+---------+---------+----+-----+
|      30255|  2927408|       29|1991|0.386|
|      30255|  2927408|       29|2000|0.512|
|      30255|  5103403|       51|1991|0.449|
|      30255|  5103403|       51|2000|0.601|
|      30255|  3304557|       33|1991|0.573|
|      30255|  3304557|       33|2000|0.664|
|      30255|  3304557|       33|2010|0.761|
|      30255|  3304557|       33|2012|0.762|
|      30255|  3304557|       33|2013|0.768|
|      30255|  3304557|       33|2014| 0.78|
|      30255|  3304557|       33|2015|0.785|
|      30255|  3304557|       33|2016|0.789|
|      30255|  3304557|       33|2017|0.791|
|      30255|  3304557|       33|2018|0.805|
|      30255|  3304557|       33|2019|0.809|
|      30255|  3304557|       33|2020|0.785|
|      30255|  3304557|       33|2021|0.762|
|      30255|  2800308|       28|1991|0.408|
|      30255|  1600303|       16|1991|0.472|
|      302

In [8]:
# POPULACAO

# leitura dos dados via odbc
df_pesquisas_populacao = spark_session.read.format("jdbc") \
    .option("url", bronze_url) \
    .option("driver", properties["driver"]) \
    .option("dbtable", "pesquisas_populacao") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .load()


# seleciona os dados aplicando os tipos
df_pesquisas_populacao = df_pesquisas_populacao.select(
    col('cd_pesquisa').astype(StringType()),
    col('municipio').astype(IntegerType()),
    lit(None).alias('cd_estado').astype(IntegerType()),
    col('ano').astype(IntegerType()),
    col('valor').astype(StringType())
)

# Mostrar o DataFrame resultante
df_pesquisas_populacao.show()

+-----------+---------+---------+----+------+
|cd_pesquisa|municipio|cd_estado| ano| valor|
+-----------+---------+---------+----+------+
|  9324_6579|  3500105|     NULL|2018| 35023|
|  9324_6579|  3500204|     NULL|2018|  3571|
|  9324_6579|  3500303|     NULL|2018| 35954|
|  9324_6579|  3500402|     NULL|2018|  8137|
|  9324_6579|  3500501|     NULL|2018| 18599|
|  9324_6579|  3500550|     NULL|2018|  6040|
|  9324_6579|  3500600|     NULL|2018|  3380|
|  9324_6579|  3500709|     NULL|2018| 37023|
|  9324_6579|  3500758|     NULL|2018|  5918|
|  9324_6579|  3500808|     NULL|2018|  4147|
|  9324_6579|  3500907|     NULL|2018|  4134|
|  9324_6579|  3501004|     NULL|2018| 16164|
|  9324_6579|  3501103|     NULL|2018|  4110|
|  9324_6579|  3501152|     NULL|2018| 18484|
|  9324_6579|  3501202|     NULL|2018|  3712|
|  9324_6579|  3501301|     NULL|2018| 24830|
|  9324_6579|  3501400|     NULL|2018|  5179|
|  9324_6579|  3501509|     NULL|2018|  3206|
|  9324_6579|  3501608|     NULL|2

In [9]:
# PIB

# leitura dos dados via odbc
df_pesquisas_pib = spark_session.read.format("jdbc") \
    .option("url", bronze_url) \
    .option("driver", properties["driver"]) \
    .option("dbtable", "pesquisas_pib") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .load()

# seleciona os dados aplicando os tipos
df_pesquisas_pib = df_pesquisas_pib.select(
    col('cd_pesquisa').astype(StringType()),
    col('municipio').astype(IntegerType()),
    lit(None).alias('cd_estado').astype(IntegerType()),
    col('ano').astype(IntegerType()),
    col('valor').astype(StringType())
)

# Mostrar o DataFrame resultante
df_pesquisas_pib.show()

+-----------+---------+---------+----+--------+
|cd_pesquisa|municipio|cd_estado| ano|   valor|
+-----------+---------+---------+----+--------+
|    37_5938|  3500105|     NULL|2019| 1191134|
|    37_5938|  3500204|     NULL|2019|  112493|
|    37_5938|  3500303|     NULL|2019| 1046725|
|    37_5938|  3500402|     NULL|2019|  172428|
|    37_5938|  3500501|     NULL|2019|  530696|
|    37_5938|  3500550|     NULL|2019|  174587|
|    37_5938|  3500600|     NULL|2019|  146025|
|    37_5938|  3500709|     NULL|2019| 2146823|
|    37_5938|  3500758|     NULL|2019|  105393|
|    37_5938|  3500808|     NULL|2019|   87021|
|    37_5938|  3500907|     NULL|2019|   90689|
|    37_5938|  3501004|     NULL|2019|  536419|
|    37_5938|  3501103|     NULL|2019|  121525|
|    37_5938|  3501152|     NULL|2019| 2306521|
|    37_5938|  3501202|     NULL|2019|  101240|
|    37_5938|  3501301|     NULL|2019|  547949|
|    37_5938|  3501400|     NULL|2019|   50648|
|    37_5938|  3501509|     NULL|2019|  

# Pesquisas (Tratamento)

In [10]:
# Une todas as pesquisas
df_pesquisas = df_pesquisas_populacao.union(df_idh_final).union(df_pesquisas_pib)

# Grava as pesquias no banco
df_pesquisas.write.format("jdbc").option("url", silver_url) \
.option("user", "ibge") \
.option("password", "ibge") \
.option("dbtable", "public.pesquisas") \
.option("truncate", True) \
.mode('overwrite') \
.save()



