In [None]:
import geopandas as gpd
import pandas as pd
import shapely
import numpy as np
import os
from pathlib import Path
from sqlalchemy import create_engine
from shapely import wkt
from shapely import validation
from shapely.validation import make_valid
from dotenv import load_dotenv

In [None]:
# Caminho do arquivo e seleção de colunas | File path and column selection
ibama_path = os.getenv("IBAMA_PATH")
use_cols = [
    'GEOM_AREA_EMBARGADA', 'UF', 'QTD_AREA_EMBARGADA', 
    'NUM_LONGITUDE_TAD', 'NUM_LATITUDE_TAD', 'MUNICIPIO',
    'DES_STATUS_FORMULARIO', 'TIPO_AREA', 'DAT_EMBARGO',
    'DAT_DESEMBARGO', 'DAT_ULT_ALTER_GEOM', 
    'DAT_ULT_ALTERACAO', 'ULTIMA_ATUALIZACAO_RELATORIO'
]

date_cols = ['DAT_EMBARGO', 'DAT_DESEMBARGO', 'DAT_ULT_ALTER_GEOM', 'DAT_ULT_ALTERACAO', 'ULTIMA_ATUALIZACAO_RELATORIO'] 

# Processo de extração e transformação inicial | Initial Extraction and Transformation (ETL)
df_mt = (
    pd.read_csv(
        ibama_path,
        sep=';',                       # Separador de colunas | Column separator
        encoding='UTF-8',               # Codificação de caracteres | Character encoding
        on_bad_lines='warn',           # Alerta sobre linhas corrompidas | Warns about corrupted lines
        usecols=use_cols,              # Filtro de colunas na leitura | Selective column loading
        date_format='%Y-%m-%d',        # Formato da data (Ano-Mês-Dia) | Date format (YYYY-MM-DD)
    )
    .loc[lambda x: x['UF'] == 'MT']    # Filtra apenas Mato Grosso | Filters for MT state only
    .sort_values(by='DAT_EMBARGO')     # Ordenação cronológica | Chronological sorting
    .reset_index(drop=True)            # Redefinição do índice | Index reset
)

df_mt[date_cols] = df_mt[date_cols].apply(pd.to_datetime, errors='coerce') # Conversão para datetime | Datetime conversion

# Exibe os primeiros registros | Displays the first records
df_mt.head()

In [None]:
# 1. Definição de Pontos Válidos (Coordenadas) | Valid Points Definition (Coordinates)
# Remove nulos e zeros (erros de GPS) | Removes NaNs and zeros (GPS errors)
valid_points = (
    df_mt['NUM_LONGITUDE_TAD'].notna() & (df_mt['NUM_LONGITUDE_TAD'] != 0) & 
    df_mt['NUM_LATITUDE_TAD'].notna() & (df_mt['NUM_LATITUDE_TAD'] != 0)
)

# 2. Definição de Geometrias Válidas | Valid Geometries Definition
# Limpa espaços e strings "NAN" residuais | Cleans spaces and residual "NAN" strings
valid_geoms = (
    df_mt['GEOM_AREA_EMBARGADA'].notna() & 
    (df_mt['GEOM_AREA_EMBARGADA'].astype(str).str.strip() != "") &
    (df_mt['GEOM_AREA_EMBARGADA'].astype(str).str.upper() != "NAN")
)

# 3. Aplicação do Filtro Lógico (OU) | Logical Filter Application (OR)
# Mantém se houver ponto OU geometria | Keeps row if it has point OR geometry
df_mt = df_mt.loc[valid_points | valid_geoms].copy()

# Exibe o dataframe limpo | Displays the cleaned dataframe
df_mt


In [None]:
# 1. Identificação de Embargos Ativos vs. Desembargados 
# Identification of Active vs. Lifted (Desembargados) Embargoes

# Criamos uma coluna de flag para facilitar a análise
# We create a flag column for easier analysis
df_mt['is_active'] = df_mt['DAT_DESEMBARGO'].isna()

# 2. Filtragem por Status do Formulário | Filtering by Form Status
# Verificamos os tipos únicos para garantir que não estamos filtrando errado
# Check unique types to ensure correct filtering
# print(df_mt['DES_STATUS_FORMULARIO'].unique())

# Filtramos apenas os 'Lavrados' (Embargos reais e oficiais)
# We filter only 'Lavrados' (Official and filed embargoes)
df_issued_embargoes = df_mt.loc[df_mt['DES_STATUS_FORMULARIO'].str.upper() == 'LAVRADO'].reset_index(drop=True).copy()

# 3. Análise de Embargos Cancelados ou em análise
# Analysis of Canceled or under-review embargoes
df_lifted_embargoes = df_mt.loc[df_mt['DES_STATUS_FORMULARIO'].str.upper() == 'CANCELADO'].reset_index(drop=True).copy()

df_issued_embargoes.head(5)


In [None]:
# Obtém a data atual / Get current date
today = pd.Timestamp.now().normalize()

# Calcula a diferença de tempo em anos / Calculate time difference in years
days_difference = today - df_issued_embargoes['DAT_EMBARGO']
float_years = (days_difference.dt.days / 365.25)

# Calcula a idade do embargo como inteiro / Calculate embargo age as integer
df_issued_embargoes['embargo_age'] = np.floor(float_years).astype('Int64')

# Define data de referência com substituição (fallback) / Set reference date with fallback
df_issued_embargoes['is_date_estimated'] = df_issued_embargoes['DAT_EMBARGO'].isna()
df_issued_embargoes['reference_date'] = df_issued_embargoes['DAT_EMBARGO'].fillna(df_issued_embargoes['DAT_ULT_ALTER_GEOM'])


# Ordena pela data do embargo / Sort by embargo date
df_issued_embargoes.sort_values(by='DAT_EMBARGO')


In [None]:
# Define os deslocamentos temporais / Define date offsets
offset_5 = pd.DateOffset(years=5)
offset_1 = pd.DateOffset(years=1)

# Define marcos temporais e limites / Set date milestones and thresholds
cutoff_date = pd.Timestamp('2008-07-22')
recent_threshold = today - offset_5
active_threshold = today - offset_1

# Define as condições lógicas para classificação / Define logical conditions for classification
cond_future = df_issued_embargoes['reference_date'] > today
cond_missing = df_issued_embargoes['reference_date'].isna()
cond_consolidated = df_issued_embargoes['reference_date'] < cutoff_date
cond_transition = (df_issued_embargoes['reference_date'] >= cutoff_date) & (df_issued_embargoes['reference_date'] < recent_threshold)
cond_recent = (df_issued_embargoes['reference_date'] >= recent_threshold) & (df_issued_embargoes['reference_date'] <= active_threshold)
cond_activate = df_issued_embargoes['reference_date'] >= active_threshold

# Agrupa as condições em uma lista / Group conditions into a list
conditions = [
    cond_future,
    cond_missing,
    cond_consolidated,
    cond_transition,
    cond_recent,
    cond_activate
]

# Define os rótulos correspondentes / Define corresponding labels
choices = [
    'inconclusive_history',
    'insufficient_data',
    'consolidated_area',
    'post_code_transition',
    'recent_violation',
    'active_enforcement'
]

# Creating the column with the legal classification
df_issued_embargoes['compliance_status'] = np.select(conditions, choices, default='undefined')

# Optimal practice: convert to category to save memory
df_issued_embargoes['compliance_status'] = df_issued_embargoes['compliance_status'].astype('category')

df_issued_embargoes.loc[lambda x: x['compliance_status'] == 'recent_violation']

# Validação final: Verificar se sobrou algum "undefined" ou lógica quebrada
print(df_issued_embargoes['compliance_status'].value_counts(dropna=False))

In [None]:
# Função segura para carregar WKT tratando erros / Safe function to load WKT handling errors
def safe_wkt_load(wkt_string):
    if pd.isna(wkt_string):
        return None
    
    try:
        real_geoms = wkt.loads(wkt_string)
        return real_geoms
    except:
        return None

# Processa geometrias (WKT e Coordenadas) / Process geometries (WKT and Coordinates)
df_issued_embargoes['geometry_wkt'] = df_issued_embargoes['GEOM_AREA_EMBARGADA'].apply(safe_wkt_load)
df_issued_embargoes['geometry_point'] = gpd.points_from_xy(df_issued_embargoes['NUM_LONGITUDE_TAD'], df_issued_embargoes['NUM_LATITUDE_TAD'])
df_issued_embargoes.head(2)

In [None]:
# Filtra coordenadas inválidas (nulas ou zero) / Filter invalid coordinates (null or zero)
mask = (df_issued_embargoes['NUM_LONGITUDE_TAD'].notna()) & (df_issued_embargoes['NUM_LONGITUDE_TAD'] != 0)

df_issued_embargoes.loc[~mask, 'geometry_point'] = None

# Prioriza geometria WKT, preenchendo falhas com Pontos / Prioritize WKT geometry, filling gaps with Points
df_issued_embargoes['final_geometry'] = df_issued_embargoes['geometry_wkt'].fillna(df_issued_embargoes['geometry_point'])

# Cria GeoDataFrame e define CRS original (WGS84) / Create GeoDataFrame and set original CRS (WGS84)
gdf = gpd.GeoDataFrame(df_issued_embargoes, geometry='final_geometry', crs="EPSG:4326")

gdf = gdf.rename_geometry('geometry')

# Identifica tipo de geometria para processamento separado / Identify geometry type for separate processing
gdf['is_point_origin'] = gdf.geometry.type == 'Point'

gdf_points = gdf[gdf.geometry.type == 'Point'].copy()
gdf_polys = gdf[gdf.geometry.type != 'Point'].copy()

# Aplica projeção métrica e buffer em pontos / Apply metric projection and buffer to points
if not gdf_points.empty:
    gdf_points = gdf_points.to_crs(epsg=31982)
    # Cria buffer de 50 metros ao redor do ponto / Create 50-meter buffer around the point
    gdf_points['geometry'] = gdf_points.geometry.buffer(50)

# Aplica projeção métrica em polígonos / Apply metric projection to polygons
if not gdf_polys.empty:
    gdf_polys = gdf_polys.to_crs(epsg=31982)

# Unifica os dados processados / Merge processed data
gdf_final_postgis = pd.concat([gdf_points, gdf_polys], ignore_index=True)

# Remove colunas auxiliares de geometria / Drop auxiliary geometry columns
cols_to_drop = ['geometry_wkt', 'geometry_point']

gdf_final_postgis = gdf_final_postgis.drop(columns=cols_to_drop, errors='warn')
gdf_final_postgis = gdf_final_postgis.dropna(subset=['geometry'])

print(f"Total de linhas prontas para o PostGIS: {len(gdf_final_postgis)}")
print(f"CRS Final: {gdf_final_postgis.crs}")
gdf_final_postgis.head()

In [None]:
# Configuração da conexão com o banco de dados / Database connection configuration
db_connection_str = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}@{os.getenv('DB_HOST')}:5432/{os.getenv('DB_NAME')}"
engine = create_engine(db_connection_str)

# 1. Assegura que o objeto é um GeoDataFrame com CRS correto / Ensure object is a GeoDataFrame with correct CRS
# (O pd.concat pode perder metadados espaciais / pd.concat might lose spatial metadata)
gdf_final_postgis = gpd.GeoDataFrame(
    gdf_final_postgis, 
    geometry='geometry', 
    crs="EPSG:31982"
)

# 2. Correção de Geometrias Inválidas / Fix Invalid Geometries
# Corrige polígonos "laço" que o PostGIS rejeita / Fix "bowtie" polygons rejected by PostGIS
gdf_final_postgis['geometry'] = gdf_final_postgis['geometry'].apply(
    lambda geom: make_valid(geom) if geom is not None else None
)

# 3. Filtro de Segurança Final / Final Safety Filter
# Mantém apenas Polígonos ou MultiPolígonos / Keep only Polygons or MultiPolygons
gdf_final_postgis = gdf_final_postgis[
    gdf_final_postgis.geometry.type.isin(['Polygon', 'MultiPolygon'])
]

print(f"Total Final Validado: {len(gdf_final_postgis)}")

try:
    # Exporta para PostGIS / Export to PostGIS
    gdf_final_postgis.to_postgis(
        name='ibama_embargos_mt_final',
        con=engine,
        if_exists='fail',  # Falha se tabela já existir para segurança / Fail if table exists for safety
        index=False
    )
    print("Tabela criada com sucesso!")
except ValueError:
    print("Aviso: A tabela já existe no PostGIS. Nenhuma alteração foi feita. / Warning: Table already exists in PostGIS. No changes made.")
except Exception as e:
    print(f"Ocorreu um erro inesperado: {e}")