In [None]:
from dotenv import load_dotenv

load_dotenv()
import pandas as pd
import geopandas as gpd
import glob
import os
import psycopg2
from sqlalchemy import create_engine, text
from sqlalchemy.types import Integer
from tqdm import tqdm
from traceback import format_exc

Configurações e alguns arquivos básicos

In [None]:
chunk_size = 1000

DB_CONFIGS = {
    "DB_HOST": os.getenv("DB_HOST", "localhost"),
    "DB_PORT": os.getenv("DB_PORT", None),
    "DB_USER": os.getenv("DB_USER", "postgres"),
    "DB_PASSWORD": os.getenv("DB_PASSWORD", "postgres"),
    "DB_NAME": os.getenv("DB_NAME", "dbname"),
}

campos_pessoas = {
    "pessoas_amarelas": "Int64",
    "pessoas_brancas": "Int64",
    "pessoas_indigenas": "Int64",
    "pessoas_pretas": "Int64",
}

use_cols = [
    "cd_setor",
    "situacao",
    "cd_sit",
    "cd_tipo",
    "area_km2",
    "cd_regiao",
    "nm_regiao",
    "cd_uf",
    "nm_uf",
    "cd_mun",
    "nm_mun",
    "cd_dist",
    "nm_dist",
    "cd_subdist",
    "nm_subdist",
    "cd_bairro",
    "nm_bairro",
    "cd_nu",
    "nm_nu",
    "cd_fcu",
    "nm_fcu",
    "cd_aglom",
    "nm_aglom",
    "cd_rgint",
    "nm_rgint",
    "cd_rgi",
    "nm_rgi",
    "cd_concurb",
    "nm_concurb",
    "raca_cor",
    "tile_x",
    "tile_y",
    "quadkey",
    "geometry",
]

ano = 2022
table_name = f"raca_cor_{ano}"
schema_name = "public"

data_folder = os.path.join(
    "/home/kylefelipe/mapa_repo/estudos_tematicos/COR_RACA",
    str(ano),
    "pontos_aleatorios",
)
bcim_path = "./estados_brasil.csv"

bcim_df = pd.read_csv(bcim_path, sep=";")

# Publicando os dados no banco de dados

Vamos listar todos os arquivos disponíveis no diretório de trabalho para publicação no banco de dados.

In [7]:
files = glob.glob(os.path.join(data_folder, "*.gpkg"))
# ordena por nome do arquivo
files.sort()
print(f"Quantidade de arquivos: {len(files)}")

Quantidade de arquivos: 27


Publicar os arquivos em uma tabela no banco de dados, criando a tabela na primeira vez que o código é executado e fazendo a inserção dos dados nas próximas vezes.

In [None]:
# files = [
#     "/home/kylefelipe/mapa_repo/estudos_tematicos/COR_RACA/2022/pontos_aleatorios/es_pontos_pessoas.gpkg",
#     "/home/kylefelipe/mapa_repo/estudos_tematicos/COR_RACA/2022/pontos_aleatorios/df_pontos_pessoas.gpkg",
#     "/home/kylefelipe/mapa_repo/estudos_tematicos/COR_RACA/2022/pontos_aleatorios/mg_pontos_pessoas.gpkg",
# ]


def to_integer(x):
    try:
        return int(x)
    except:
        return None


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Envia os dados para o banco de dados")
    parser.add_argument(
        "-s", "--start", type=int, default=None, help="Estado de início"
    )
    parser.add_argument("-u", "--uf", type=str, default=None, help="Sigla da UF")
    args = parser.parse_args()

    start_state = args.start or 0
    state_slice = slice(start_state, start_state + 5)
    sliced_files = files
    if start_state:
        sliced_files = sliced_files[state_slice]
    elif args.uf:
        print(f"Enviando apenas os arquivos da UF {args.uf}")
        sliced_files = [
            file for file in files if os.path.basename(file).startswith(args.uf)
        ]

    engine = create_engine(
        f"postgresql://{DB_CONFIGS['DB_USER']}:{DB_CONFIGS['DB_PASSWORD']}@{DB_CONFIGS['DB_HOST']}:{DB_CONFIGS['DB_PORT']}/{DB_CONFIGS['DB_NAME']}"
    )


    bar_files = tqdm(
        sliced_files,
        desc="Enviando para o banco de dados",
        unit="arquivo",
        total=len(sliced_files),
        leave=True,
        position=0,
    )

    files_bar_log = tqdm(total=0, position=1, bar_format="{desc}")
    chunk_log = tqdm(total=0, position=2, bar_format="{desc}")
    log_file = "./log.txt"
    filename = None
    for file in bar_files:
        try:
            if not os.path.exists(file):
                raise (FileNotFoundError(f"O arquivo {file} não foi encontrado!"))
            filename = os.path.basename(file)
            gpkg_table_name = filename.split(".")[0]
            sigla_uf = filename.split("_")[0].lower()
            uf_data = bcim_df[bcim_df["sigla"] == sigla_uf.upper()]
            cd_uf = uf_data["geocodigo"].values[0]

            count_data_sql = f"""SELECT COUNT(*) FROM {schema_name}.{table_name} WHERE "cd_uf" = '{cd_uf}';"""

            # Conta a quantidade de dados existentes da UF para não duplicar
            start_fid = 0
            with engine.connect() as conn:
                # verifica se a tabela existe
                if engine.dialect.has_table(conn, table_name, schema=schema_name):

                    files_bar_log.set_description_str(
                        f"Contando os dados do estado {sigla_uf} - {uf_data['nome'].values[0]}"
                    )
                    start_fid = conn.execute(text(count_data_sql)).fetchone()[0]
            files_bar_log.set_description_str(f"  Enviando {filename}")
            chunk = 1

            while True:
                slice_obj = slice(start_fid, start_fid + chunk_size)
                gdf = gpd.read_file(
                    file,
                    rows=slice_obj,
                    columns=use_cols,
                )
                # renomeia geometry para geom
                start_fid += chunk_size
                if gdf.empty:
                    break

                gdf.rename_geometry("geom", inplace=True)
                chunk_log.set_description_str(f"    Enviando Chunk {chunk}!")

                # transforma os campos em campos_pessoas em inteiros

                # for campo, typo in campos_pessoas.items():
                #     gdf[campo] = gdf[campo].astype(typo)
                # remove os campo em campos_pessoas

                gdf.to_postgis(
                    table_name,
                    engine,
                    schema=schema_name,
                    if_exists="append",
                    index=False,
                    dtype={
                        "pessoas_amarelas": Integer(),
                        "pessoas_brancas": Integer(),
                        "pessoas_indigenas": Integer(),
                        "pessoas_pretas": Integer(),
                    },
                )
                chunk += 1
                del gdf

        except KeyboardInterrupt:
            print("\nInterrompido pelo usuário!")
            break
        except Exception as e:
            mode = "a" if os.path.exists(log_file) else "w"
            with open(log_file, mode) as f:
                f.write(f"Erro ao inserir o arquivo {file}\n")
                if filename:
                    f.write(f"Arquivo: {filename}\n")
                f.write(f"{e}\n")
                f.write(format_exc())
                f.write("\n")
            print(f"Erro ao inserir o arquivo {file}")
            print(e)
            print(format_exc())
            continue

usage: ipykernel_launcher.py [-h] [-s START] [-u UF]
ipykernel_launcher.py: error: unrecognized arguments: --f=/run/user/1000/jupyter/runtime/kernel-v3de85b37f6d99608f6e6ea22f96aea59b64074ed4.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
