# 1. Contexto de Negócio:

"House Flipping" é uma modalidade de investimento imobiliário, que compreende (i) a compra de imóveis descontados, (ii) reforma de custo reduzido com foco em valor de mercado, e (iii) posterior venda em curto espaço de tempo. Tradicionalmente esta operação é focada em imóveis residenciais.

A grande ideia por trás desse investimento é adquirir imóveis bastante descontados quando o vendedor tem urgência no negócio (casos de dívidas, divórcio, falecimento, necessidade de liberar capital, etc), priorizando excelente localização. Para o contexto do "House Flipping", boa localização seria aquela que permitiria o giro rápido do imóvel, ou seja, bairros de alta liquidez. 

Seguindo o princípio da oferta e da procura, bairros de alta liquidez nos centros urbanos são, em geral, aqueles com maior preço por metro quadrado. Por este motivo, uma análise dos preços de venda de imóveis é um excelente ponto de partida para um investidor desse tipo selecionar sua área de atuação.  

# 2. Objetivo da análise:

O objetivo desta análise é fazer um panorama geral de preços de venda de imóveis residenciais na cidade de São Paulo. 

Para este intuito, serão analisados os dados de todas as transações imobiliárias para as quais houve recolhimento de ITBI (Imposto sobre a Transmissão de Bens Imóveis), no período de 2022 a 2025 nessa cidade.

As principais perguntas que se propõe responder são:
- Quais são os top 10 bairros de maior preço por metro quadrado na cidade de São Paulo?
- Como tem sido a variação desses valores nos últimos 3 anos? Que bairros apresentam maior valorização?
- Que características dos imóveis (tipologia, número de quartos, vaga de garagem, com/sem elevador, etc) estão relacionadas a maior valorização deles?
- Existe sazonalidade no mercado imobiliário desta cidade? Quais são os meses mais indicados para compra descontada?
- Como é a aceitação dos vendedores em relação a financiamento? Em média, qual percentual do valor de venda é financiado? 

Observação: Inicialmente o objetivo era estudar a cidade de Juiz de Fora - MG, onde resido. No entanto, tal objetivo não foi possível pela falta de disponibilidade de informações granulares no portal de dados de ITBI dessa cidade.

# 3. Busca e coleta de dados

## 3.1 Fontes de Dados

### 3.1.1 Dados ITBI

A fonte dos dados de ITBI utilizados nesta análise é a Prefeitura de São Paulo, através do endereço: <https://prefeitura.sp.gov.br/web/fazenda/w/acesso_a_informacao/31501>

**URL das bases de dados em formato xlsx:**

2025: https://prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/itbi/GUIAS%20DE%20ITBI%20PAGAS%20%2825112025%29.xlsx

2024: https://prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/itbi/GUIAS-DE-ITBI-PAGAS-2024.xlsx

2023: https://www.prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/XLSX/GUIAS-DE-ITBI-PAGAS-2023.xlsx

2022: https://www.prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/XLSX/GUIAS_DE_ITBI_PAGAS_12-2022.xlsx 

Cada uma das URLs acima descritas apresenta os dados de um ano calendário. Dentro do arquivo xlsx os dados de cada um dos meses são reportados em abas diferentes. Para o ano de 2025 estão disponíveis dados até o mês de outubro.

As últimas abas dos arquivos contém informações complementares que serão utilizadas no glossário de dados e nas tabelas dimensão que serão descritas no capítulo de modelagem. 


### 3.1.2 Dados de CEP

As informações a respeito dos CEPs foram obtidas no site CEP Aberto, disponível em: https://www.cepaberto.com/downloads/new

Cidades e Municípios: https://www.cepaberto.com/downloads.csv?name=cities

Estados: https://www.cepaberto.com/downloads.csv?name=states


CEPs do estado de São Paulo:

https://www.cepaberto.com/downloads.csv?name=SP&part=1

https://www.cepaberto.com/downloads.csv?name=SP&part=2

https://www.cepaberto.com/downloads.csv?name=SP&part=3

https://www.cepaberto.com/downloads.csv?name=SP&part=4

https://www.cepaberto.com/downloads.csv?name=SP&part=5



O armazenamento dessas bases em nuvem está descrito nos scripts abaixo.

## 3.2 Ingestão de dados e criação de tabelas

### 3.2.1 Tabela consolidada com dados de ITBI (2022 a 2025):

In [0]:
#Instalar biblioteca openpyxl para ler arquivo em formato xlsx
 
%pip install openpyxl

In [0]:
#Reinicia o ambiente Python para garantir que a biblioteca esteja disponível

%restart_python

In [0]:
# Cria tabela com dados de ITBI 2025

import requests
import pandas as pd
import re

url = (
    'https://prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/itbi/'
    'GUIAS%20DE%20ITBI%20PAGAS%20%2825112025%29.xlsx'
)

# Baixa o arquivo da url e salva em um arquivo temporário no cluster
response = requests.get(url)
with open('/tmp/seuarquivo.xlsx', 'wb') as f:
    f.write(response.content)

# Lê as 10 primeiras abas do arquivo Excel (jan a out) para gerar um dicionário Python (chave - valor), contendo nome da aba e conteúdo gravado em um dataframe
sheets = pd.read_excel(
    '/tmp/seuarquivo.xlsx',
    sheet_name=list(range(10))
)

# Seleciona apenas as 28 primeiras colunas de cada aba (dados para além dessas são lixo) e guarda em uma lista de dataframes
dfs = [df.iloc[:, :28].copy() for df in sheets.values()]

# Copia o header do primeiro df em todos os demais para conseguir concatená-los (dados originais tem headers despadronizados)
columns = dfs[0].columns.tolist()
for df in dfs:
    df.columns = columns
pdf_all = pd.concat(dfs, ignore_index=True)

# Converte todas as colunas que contêm texto para tipo de dado string
for col in pdf_all.select_dtypes(include=['object']).columns:
    pdf_all[col] = pdf_all[col].astype(str)

#Define função que limpa os nomes das colunas: tira espaços e caracteres especiais
def clean_column(name):
    return re.sub(r'[ ,;{}()\n\t=/\.]', '_', str(name)).replace('°', 'o').replace('%', 'pct')

#Chama a função clean_column definida anterioremente para limpar os nomes das colunas de pdf_all, em preparação para convertê-lo para DataFrame do Spark
pdf_all.columns = [clean_column(col) for col in pdf_all.columns]

# Converte o DataFrame do Pandas para um DataFrame do Spark
df = spark.createDataFrame(pdf_all)

# Deleta a tabela, caso já exista
table_name = "mvp_engdados_puc.bronze.guias_itbi_2025"
spark.sql(f"DROP TABLE IF EXISTS {table_name} PURGE")

# Conta número de linhas do DataFrame e mostra 100 linhas dele
num_linhas = df.count()
print(f"Número de linhas no Spark: {num_linhas}")
display(df.limit(100))

# Salva o DataFrame do Spark em uma tabela persistida no DBFS
df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name)

# Os headers de cada aba do Excel original estão misturados no corpo dos dados. Esta limpeza será feita na etapa de qualidade.

In [0]:

# Processamento idem anterior para criar as tabelas de itbi pagos de 2022 a 20224

import requests
import pandas as pd
import re

urls = [
    'https://prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/itbi/GUIAS-DE-ITBI-PAGAS-2024.xlsx',
    'https://www.prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/XLSX/GUIAS-DE-ITBI-PAGAS-2023.xlsx',
    'https://www.prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/XLSX/GUIAS_DE_ITBI_PAGAS_12-2022.xlsx',
]

#Define função que limpa os nomes das colunas: tira espaços e caracteres especiais
def clean_column(name):
    return re.sub(r'[ ,;{}()\n\t=/\.]', '_', str(name)).replace('°', 'o').replace('%', 'pct')

# Para os anos de 2022 a 2024:
for idx, url in enumerate(urls, start=1):
    
    # Baixa o arquivo da url e salva em um arquivo temporário no cluster
    response = requests.get(url)
    file_path = f'/tmp/seuarquivo_{idx}.xlsx'
    with open(file_path, 'wb') as f:
        f.write(response.content)

    # Lê as 12 primeiras abas do arquivo Excel (jan a dez)
    sheets = pd.read_excel(
        file_path,
        sheet_name=list(range(12))
    )

    # Seleciona apenas as 28 primeiras colunas de cada aba (dados para além dessas são lixo)
    dfs = [df.iloc[:, :28].copy() for df in sheets.values()]

    # Copia um header selecionado padrão em todos os demais para conseguir concatená-los (dados originais tem headers despadronizados)
    columns = dfs[1].columns.tolist()
    for df in dfs:
        df.columns = columns
    pdf_all = pd.concat(dfs, ignore_index=True)

    # Converte todas as colunas que contêm texto para tipo de dado string
    for col in pdf_all.select_dtypes(include=['object']).columns:
        pdf_all[col] = pdf_all[col].astype(str)

    #Chama a função clean_column definida anteriormente para limpar os nomes das colunas de pdf_all, em preparação para convertê-lo para DataFrame do Spark
    pdf_all.columns = [clean_column(col) for col in pdf_all.columns]

    # Converte o DataFrame do Pandas para um DataFrame do Spark
    df = spark.createDataFrame(pdf_all)

    # Deleta a tabela, caso já exista
    table_name = f"mvp_engdados_puc.bronze.guias_itbi_{idx}"
    spark.sql(f"DROP TABLE IF EXISTS {table_name} PURGE")

    # Conta número de linhas do DataFrame e mostra 100 linhas dele
    num_linhas = df.count()
    print(f"Número de linhas no Spark: {num_linhas}")
    display(df.limit(100))

    # Salva o DataFrame do Spark em uma tabela persistida no DBFS
    df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name)

# Os headers de cada aba do Excel original estão misturados no corpo dos dados. Esta limpeza será feita na etapa de qualidade.


In [0]:
%sql
/* Criação da tabela agregada contendo todos os dados de itbi desde 2022 */

CREATE OR REPLACE TABLE mvp_engdados_puc.bronze.guias_itbi_bronze AS

SELECT 
  try_cast(No_do_Cadastro__SQL_ AS BIGINT) AS No_do_Cadastro__SQL_,
  Nome_do_Logradouro,
  try_cast(`Número` AS BIGINT) AS `Número`,
  Complemento,
  Bairro,
  `Referência`,
  try_cast(CEP AS DOUBLE) AS CEP,
  `Natureza_de_Transação`,
  try_cast(`Valor_de_Transação__declarado_pelo_contribuinte_` AS DOUBLE) AS `Valor_de_Transação__declarado_pelo_contribuinte_`,
  try_cast(`Data_de_Transação` AS DATE) AS `Data_de_Transação`,
  try_cast(`Valor_Venal_de_Referência` AS DOUBLE) AS `Valor_Venal_de_Referência`,
  try_cast(`Proporção_Transmitida__pct_` AS DOUBLE) AS `Proporção_Transmitida__pct_`,
  try_cast(`Valor_Venal_de_Referência__proporcional_` AS DOUBLE) AS `Valor_Venal_de_Referência__proporcional_`,
  try_cast(`Base_de_Cálculo_adotada` AS DOUBLE) AS `Base_de_Cálculo_adotada`,
  Tipo_de_financiamento,
  try_cast(Valor_Financiado AS DOUBLE) AS Valor_Financiado,
  `Cartório_de_Registro`,
  try_cast(`Matrícula_do_Imóvel` AS BIGINT) AS `Matrícula_do_Imóvel`,
  `Situação_do_SQL`,
  try_cast(`Área_do_Terreno__m2_` AS BIGINT) AS `Área_do_Terreno__m2_`,
  try_cast(Testada__m_ AS BIGINT) AS Testada__m_,
  try_cast(`Fração_Ideal` AS DOUBLE) AS `Fração_Ideal`,
  try_cast(`Área_Construída__m2_` AS BIGINT) AS `Área_Construída__m2_`,
  try_cast(Uso__IPTU_ AS BIGINT) AS Uso__IPTU_,
  `Descrição_do_uso__IPTU_`,
  try_cast(`Padrão__IPTU_` AS BIGINT) AS `Padrão__IPTU_`,
  `Descrição_do_Padrão__IPTU_`,
  try_cast(ACC__IPTU_ AS STRING) AS ACC__IPTU_
  
FROM mvp_engdados_puc.bronze.guias_itbi_2025

UNION ALL
SELECT 
try_cast(No_do_Cadastro__SQL_ AS BIGINT) AS No_do_Cadastro__SQL_,
  Nome_do_Logradouro,
  try_cast(`Número` AS BIGINT) AS `Número`,
  Complemento,
  Bairro,
  `Referência`,
  try_cast(CEP AS DOUBLE) AS CEP,
  `Natureza_de_Transação`,
  try_cast(`Valor_de_Transação__declarado_pelo_contribuinte_` AS DOUBLE) AS `Valor_de_Transação__declarado_pelo_contribuinte_`,
  try_cast(`Data_de_Transação` AS DATE) AS `Data_de_Transação`,
  try_cast(`Valor_Venal_de_Referência` AS DOUBLE) AS `Valor_Venal_de_Referência`,
  try_cast(`Proporção_Transmitida__pct_` AS DOUBLE) AS `Proporção_Transmitida__pct_`,
  try_cast(`Valor_Venal_de_Referência__proporcional_` AS DOUBLE) AS `Valor_Venal_de_Referência__proporcional_`,
  try_cast(`Base_de_Cálculo_adotada` AS DOUBLE) AS `Base_de_Cálculo_adotada`,
  Tipo_de_financiamento,
  try_cast(Valor_Financiado AS DOUBLE) AS Valor_Financiado,
  `Cartório_de_Registro`,
  try_cast(`Matrícula_do_Imóvel` AS BIGINT) AS `Matrícula_do_Imóvel`,
  `Situação_do_SQL`,
  try_cast(`Área_do_Terreno__m2_` AS BIGINT) AS `Área_do_Terreno__m2_`,
  try_cast(Testada__m_ AS BIGINT) AS Testada__m_,
  try_cast(`Fração_Ideal` AS DOUBLE) AS `Fração_Ideal`,
  try_cast(`Área_Construída__m2_` AS BIGINT) AS `Área_Construída__m2_`,
  try_cast(Uso__IPTU_ AS BIGINT) AS Uso__IPTU_,
  `Descrição_do_uso__IPTU_`,
  try_cast(`Padrão__IPTU_` AS BIGINT) AS `Padrão__IPTU_`,
  ACC__IPTU_ AS `Descrição_do_Padrão__IPTU_`, 
  try_cast(ACC__IPTU__1 AS STRING) AS ACC__IPTU_

FROM mvp_engdados_puc.bronze.guias_itbi_1

UNION ALL
SELECT 
try_cast(No_do_Cadastro__SQL_ AS BIGINT) AS No_do_Cadastro__SQL_,
  Nome_do_Logradouro,
  try_cast(`Número` AS BIGINT) AS `Número`,
  Complemento,
  Bairro,
  `Referência`,
  try_cast(CEP AS DOUBLE) AS CEP,
  `Natureza_de_Transação`,
  try_cast(`Valor_de_Transação__declarado_pelo_contribuinte_` AS DOUBLE) AS `Valor_de_Transação__declarado_pelo_contribuinte_`,
  try_cast(`Data_de_Transação` AS DATE) AS `Data_de_Transação`,
  try_cast(`Valor_Venal_de_Referência` AS DOUBLE) AS `Valor_Venal_de_Referência`,
  try_cast(`Proporção_Transmitida__pct_` AS DOUBLE) AS `Proporção_Transmitida__pct_`,
  try_cast(`Valor_Venal_de_Referência__proporcional_` AS DOUBLE) AS `Valor_Venal_de_Referência__proporcional_`,
  try_cast(`Base_de_Cálculo_adotada` AS DOUBLE) AS `Base_de_Cálculo_adotada`,
  Tipo_de_financiamento,
  try_cast(Valor_Financiado AS DOUBLE) AS Valor_Financiado,
  `Cartório_de_Registro`,
  try_cast(`Matrícula_do_Imóvel` AS BIGINT) AS `Matrícula_do_Imóvel`,
  `Situação_do_SQL`,
  try_cast(`Área_do_Terreno__m2_` AS BIGINT) AS `Área_do_Terreno__m2_`,
  try_cast(Testada__m_ AS BIGINT) AS Testada__m_,
  try_cast(`Fração_Ideal` AS DOUBLE) AS `Fração_Ideal`,
  try_cast(`Área_Construída__m2_` AS BIGINT) AS `Área_Construída__m2_`,
  try_cast(Uso__IPTU_ AS BIGINT) AS Uso__IPTU_,
  `Descrição_do_uso__IPTU_`,
  try_cast(`Padrão__IPTU_` AS BIGINT) AS `Padrão__IPTU_`,
  `Descrição_do_Padrão__IPTU_`,
  try_cast(ACC__IPTU_ AS STRING) AS ACC__IPTU_ 

FROM mvp_engdados_puc.bronze.guias_itbi_2

UNION ALL
SELECT 
try_cast(No_do_Cadastro__SQL_ AS BIGINT) AS No_do_Cadastro__SQL_,
  Nome_do_Logradouro,
  try_cast(`Número` AS BIGINT) AS `Número`,
  Complemento,
  Bairro,
  `Referência`,
  try_cast(CEP AS DOUBLE) AS CEP,
  `Natureza_de_Transação`,
  try_cast(`Valor_de_Transação__declarado_pelo_contribuinte_` AS DOUBLE) AS `Valor_de_Transação__declarado_pelo_contribuinte_`,
  try_cast(`Data_de_Transação` AS DATE) AS `Data_de_Transação`,
  try_cast(`Valor_Venal_de_Referência` AS DOUBLE) AS `Valor_Venal_de_Referência`,
  try_cast(`Proporção_Transmitida__pct_` AS DOUBLE) AS `Proporção_Transmitida__pct_`,
  try_cast(`Valor_Venal_de_Referência__proporcional_` AS DOUBLE) AS `Valor_Venal_de_Referência__proporcional_`,
  try_cast(`Base_de_Cálculo_adotada` AS DOUBLE) AS `Base_de_Cálculo_adotada`,
  Tipo_de_financiamento,
  try_cast(Valor_Financiado AS DOUBLE) AS Valor_Financiado,
  `Cartório_de_Registro`,
  try_cast(`Matrícula_do_Imóvel` AS BIGINT) AS `Matrícula_do_Imóvel`,
  `Situação_do_SQL`,
  try_cast(`Área_do_Terreno__m2_` AS BIGINT) AS `Área_do_Terreno__m2_`,
  try_cast(Testada__m_ AS BIGINT) AS Testada__m_,
  try_cast(`Fração_Ideal` AS DOUBLE) AS `Fração_Ideal`,
  try_cast(`Área_Construída__m2_` AS BIGINT) AS `Área_Construída__m2_`,
  try_cast(Uso__IPTU_ AS BIGINT) AS Uso__IPTU_,
  `Descrição_do_uso__IPTU_`,
  try_cast(`Padrão__IPTU_` AS BIGINT) AS `Padrão__IPTU_`,
  ACC__IPTU_ `Descrição_do_Padrão__IPTU_`,
  try_cast(ACC__IPTU__1 AS STRING) AS ACC__IPTU_
  
FROM mvp_engdados_puc.bronze.guias_itbi_3

In [0]:
%sql
/* Visualizacão da tabela criada */

SELECT * FROM mvp_engdados_puc.bronze.guias_itbi_bronze
LIMIT 100

In [0]:
%sql
/* Contador do número de linhas da tabela de itbi, camada bronze */
SELECT COUNT(*) FROM mvp_engdados_puc.bronze.guias_itbi_bronze

### 3.2.2 Tabela de dados de Uso do imóvel segundo o IPTU:

In [0]:
# Cria tabela com dados da dimensão Uso do IPTU

import requests
import pandas as pd
import re

url = (
    'https://prefeitura.sp.gov.br/cidade/secretarias/upload/fazenda/arquivos/itbi/'
    'GUIAS%20DE%20ITBI%20PAGAS%20%2825112025%29.xlsx'
)

# Baixa o arquivo da url e salva em um arquivo temporário no cluster
response = requests.get(url)
with open('/tmp/seuarquivo.xlsx', 'wb') as f:
    f.write(response.content)

# Lê apenas a aba chamada "Tabela de USOS"
df_usos = pd.read_excel(
    '/tmp/seuarquivo.xlsx',
    sheet_name="Tabela de USOS"
)

# Seleciona apenas as 2 primeiras colunas, pois se existirem mais colunas são lixo
df_usos = df_usos.iloc[:, :2].copy()

# Converte todas as colunas que contêm texto para tipo de dado string
for col in df_usos.select_dtypes(include=['object']).columns:
    df_usos[col] = df_usos[col].astype(str)

# Converte o DataFrame do Pandas para um DataFrame do Spark
df_spark_usos = spark.createDataFrame(df_usos)

# Deleta a tabela, caso já exista
table_name_usos = "mvp_engdados_puc.bronze.tabela_dim_usos"
spark.sql(f"DROP TABLE IF EXISTS {table_name_usos} PURGE")

# Conta número de linhas do DataFrame e mostra no máximo 100 linhas dele
num_linhas_usos = df_spark_usos.count()
print(f"Número de linhas no Spark: {num_linhas_usos}")
display(df_spark_usos.limit(100))

# Salva o DataFrame do Spark em uma tabela persistida no DBFS
df_spark_usos.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name_usos)

In [0]:
%sql
/* Visualizacão da tabela criada */

SELECT * FROM mvp_engdados_puc.bronze.tabela_dim_usos

### 3.2.3 Tabela com dados dos CEPs de São Paulo:

A ingestão dos dados de CEP foi feita manualmente pelo Unit Catalog. Evidências abaixo:

![](/Workspace/Users/izabelagreg@gmail.com/mvp-puc-eng-dados/Figuras/Ingestao_UnitCatalog_1)

![](/Workspace/Users/izabelagreg@gmail.com/mvp-puc-eng-dados/Figuras/Ingestao_UnitCatalog_2)

![](/Workspace/Users/izabelagreg@gmail.com/mvp-puc-eng-dados/Figuras/Ingestao_UnitCatalog_3)

![](/Workspace/Users/izabelagreg@gmail.com/mvp-puc-eng-dados/Figuras/Ingestao_UnitCatalog_4)

In [0]:
# Criação da tabela com todos os CEPs do estado de São Paulo
import zipfile
from functools import reduce

zip_paths = [
    "/Volumes/mvp_engdados_puc/staging/bases_cep_sp/sp.cepaberto_parte_1.zip",
    "/Volumes/mvp_engdados_puc/staging/bases_cep_sp/sp.cepaberto_parte_2.zip",
    "/Volumes/mvp_engdados_puc/staging/bases_cep_sp/sp.cepaberto_parte_3.zip",
    "/Volumes/mvp_engdados_puc/staging/bases_cep_sp/sp.cepaberto_parte_4.zip",
    "/Volumes/mvp_engdados_puc/staging/bases_cep_sp/sp.cepaberto_parte_5.zip"
]
staging_dir = "/Volumes/mvp_engdados_puc/staging/bases_cep_sp"

csv_paths = []
# Para cada um dos arquivos zip salvos em staging_dir, extrai o único arquivo CSV que ele contém e salva em csv_paths
for zip_path in zip_paths:    
    with zipfile.ZipFile(zip_path, 'r') as arquivo_zip:
        nome_do_csv = arquivo_zip.namelist()[0]
        arquivo_zip.extract(nome_do_csv, staging_dir)
        csv_paths.append(f"{staging_dir}/{nome_do_csv}")

# Header para os CSVs
colunas_cep = ["cep", "logradouro", "descricao", "bairro", "cidade", "uf"]

# Lê cada um dos CSV, que não tem header, forçando o esquema indicado em colunas_cep
df_list = [
    spark.read.csv(
        path,
        sep=",",
        header=False,
        inferSchema=False
    ).toDF(*colunas_cep)
    for path in csv_paths
]

# Lê a lista de dataframes armazenados em df_list e une todos em um só
df_cep_unido = reduce(lambda df1, df2: df1.unionByName(df2), df_list)

# Deleta a tabela, caso já exista
table_name_cep = "mvp_engdados_puc.bronze.cepaberto_SP"
spark.sql(f"DROP TABLE IF EXISTS {table_name_cep} PURGE")

# Salva o DataFrame do Spark em uma tabela Delta persistida no DBFS
df_cep_unido.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name_cep)

# Comando para ler 100 linhas da tabela Delta criada
display(spark.table(table_name_cep).limit(100))

In [0]:
# Criação da tabela de códigos das cidades (base CEP)
import zipfile

zip_path = "/Volumes/mvp_engdados_puc/staging/bases_cep_sp/cidades.cepaberto.zip"
staging_dir = "/Volumes/mvp_engdados_puc/staging/bases_cep_sp"

# Extrai o único arquivo CSV contido no zip_path e salva no diretório
with zipfile.ZipFile(zip_path, 'r') as arquivo_zip:
    nome_do_csv = arquivo_zip.namelist()[0]
    arquivo_zip.extract(nome_do_csv, staging_dir)
    csv_path = f"{staging_dir}/{nome_do_csv}"

# Header para o CSV
colunas_cidades = ["codigo_cidade", "nome_cidade", "codigo_estado"]

# Lê o CSV, que não tem header, forçando o esquema indicado em colunas_cidades
df_cidades = spark.read.csv(
    csv_path,
    sep=",",
    header=False,
    inferSchema=False
).toDF(*colunas_cidades)

# Deleta a tabela, caso já exista
table_name = "mvp_engdados_puc.bronze.cepaberto_cidades"
spark.sql(f"DROP TABLE IF EXISTS {table_name} PURGE")

# Salva o DataFrame do Spark em uma tabela Delta persistida no DBFS
df_cidades.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable(table_name)

# Comando para ler 100 linhas da tabela Delta criada
display(spark.table(table_name).limit(100))