# Compatibilizador de malhas censitárias
---

### Importações

In [80]:
import warnings
warnings.simplefilter(action='ignore')

import os
import geopandas as gpd
import pandas as pd
import networkx as nx
from shapely import LineString, Polygon, MultiPolygon, distance, intersects, minimum_bounding_radius as min_radius
from shapely.geometry import box
from shapely.wkt import loads, dumps

In [81]:
### Célula para conectar com Google Drive
from google.colab import drive
drive.mount('/content/drive')

if not os.getcwd().endswith('Censo IBGE 2022/Compatibilização'):
    os.chdir('/content/drive/Shareddrives/SIG LabCidade/projetos/Censo IBGE 2022/Compatibilização')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [109]:
nome_compat = 'PIC001022-SSSQQQ-MSP'
nome_A = 'PIC001022'
nome_B = 'SSSQQQ'

In [110]:
if not os.path.isdir(nome_compat):
    os.mkdir(nome_compat)

### Funções gerais

In [84]:
# Projeção das camadas
UTMCRS ='EPSG:31983'

In [85]:
# Função/regra para avaliar se geometria deve ser considerada expúria
def geomNotEspuria(geom):
    ap_ratio = geom.area/geom.length
    return (ap_ratio > 1.5 or geom.area > 100)

### Seleção de malhas

In [86]:
### Seleção para cruzamento
muns = ['3550308']

In [87]:
# Perímetros comuns
gdf_A = gpd.read_file('2000-2010-2022/perimetros_compativeis.gpkg', layer='2000-2010-2022')
gdf_A = gdf_A.to_crs("EPSG:31983")
gdf_A = gdf_A.query('CD_MUN in @muns')
gdf_A = gdf_A[['CD_PERIMETRO','geometry']]

In [88]:
gdf_A

Unnamed: 0,CD_PERIMETRO,geometry
9267,3550308010000001,"POLYGON ((338045.039 7393102.869, 337942.496 7..."
9268,3550308010000002,"POLYGON ((338051.402 7393341.588, 338063.448 7..."
9269,3550308010000003,"POLYGON ((338177.051 7393526.400, 338153.803 7..."
9270,3550308010000004,"POLYGON ((338933.568 7394599.604, 338931.029 7..."
9271,3550308010000005,"POLYGON ((338995.810 7394484.007, 339006.335 7..."
...,...,...
21068,3550308960000174,"MULTIPOLYGON (((354198.231 7397249.966, 354185..."
21069,3550308960000175,"POLYGON ((354220.439 7397241.658, 354239.973 7..."
21070,3550308960000176,"POLYGON ((354214.344 7397366.249, 354132.481 7..."
21071,3550308960000177,"POLYGON ((354660.947 7397648.538, 354708.972 7..."


In [89]:
# Quadras fiscais
gdf_B = gpd.read_file('../../../dados/RMSP-MSP_Cadastro-Logradouros-Quadras/GeoSampa_QuadrasFiscais_MSP.gpkg')
gdf_B = gdf_B.query('qd_tx_tipo == "FISCAL"')
gdf_B['SSSQQQ'] = gdf_B.apply(lambda x: f'{x["qd_setor"]}{x["qd_fiscal"]}', axis=1)
gdf_B = gdf_B[['SSSQQQ','geometry']]

## Grafo de compatibilização

In [90]:
G_compat = nx.Graph()

# Adicionar todos os nós
for i, row in gdf_A[['CD_PERIMETRO', 'geometry']].iterrows():
    G_compat.add_node(f"A.{row['CD_PERIMETRO']}",
                      malha='A',
                      nome=row['CD_PERIMETRO'],
                      geom=row['geometry'],
                      center=row['geometry'].representative_point(),)
for i, row in gdf_B[['SSSQQQ', 'geometry']].iterrows():
    G_compat.add_node(f"B.{row['SSSQQQ']}",
                      malha='B',
                      nome=row['SSSQQQ'],
                      geom=row['geometry'],
                      center=row['geometry'].representative_point(),)

### Análise de herança
Objetivo é buscar setores em B que tenham resultado de divisão de setores em A, e então vinculá-los.
Esta operação conceitualmente se dá em setores com Redesenho ou Desassociação em A (o setor já existia e foi fracionado ou teve vizinhança refeita) com setores com Desassociação, Sobrescrito, Insersão e Criação em B.

In [91]:
'''# Selecionar setores para análise
alterados_A = gdf_A.copy()
alterados_B = gdf_B.copy()

# Renomear colunas
alterados_A = alterados_A.rename(columns={k:f'A.{k}' for k in alterados_A.columns})
alterados_A = alterados_A.set_geometry('A.geometry', crs=UTMCRS)
alterados_B = alterados_B.rename(columns={k:f'B.{k}' for k in alterados_B.columns})
alterados_B = alterados_B.set_geometry('B.geometry', crs=UTMCRS)

# Calcular áreas originais
alterados_A['A.AREA_ORIGINAL'] = alterados_A.area
alterados_B['B.AREA_ORIGINAL'] = alterados_B.area

# Correção contra bug GEOSException: TopologyException: found non-noded intersection
alterados_A['A.geometry'] = [loads(dumps(geom, rounding_precision=3)) for geom in alterados_A['A.geometry']]
alterados_B['B.geometry'] = [loads(dumps(geom, rounding_precision=3)) for geom in alterados_B['B.geometry']]

# Interseção
intersecao = gpd.overlay(alterados_A, alterados_B, how='union')\
                .dropna(subset=['A.CD_PERIMETRO', 'B.SSSQQQ'])

# Calcular áreas resultantes
intersecao['B.PCT_AREA'] = intersecao.area*100/intersecao['B.AREA_ORIGINAL']

# Registrar herança (>80% da área original de B em um único setor de A)
heranca = intersecao.query('`B.PCT_AREA` >= 80')
for i, row in heranca.iterrows():
    G_compat.add_edge(f"A.{row['A.CD_PERIMETRO']}",
                      f"B.{row['B.SSSQQQ']}",
                      metodo='Herança')'''

'# Selecionar setores para análise\nalterados_A = gdf_A.copy()\nalterados_B = gdf_B.copy()\n\n# Renomear colunas\nalterados_A = alterados_A.rename(columns={k:f\'A.{k}\' for k in alterados_A.columns})\nalterados_A = alterados_A.set_geometry(\'A.geometry\', crs=UTMCRS)\nalterados_B = alterados_B.rename(columns={k:f\'B.{k}\' for k in alterados_B.columns})\nalterados_B = alterados_B.set_geometry(\'B.geometry\', crs=UTMCRS)\n\n# Calcular áreas originais\nalterados_A[\'A.AREA_ORIGINAL\'] = alterados_A.area\nalterados_B[\'B.AREA_ORIGINAL\'] = alterados_B.area\n\n# Correção contra bug GEOSException: TopologyException: found non-noded intersection\nalterados_A[\'A.geometry\'] = [loads(dumps(geom, rounding_precision=3)) for geom in alterados_A[\'A.geometry\']]\nalterados_B[\'B.geometry\'] = [loads(dumps(geom, rounding_precision=3)) for geom in alterados_B[\'B.geometry\']]\n\n# Interseção\nintersecao = gpd.overlay(alterados_A, alterados_B, how=\'union\')                .dropna(subset=[\'A.CD_PERI

### Sobreposição com buffers

O objetivo é tratar todos os casos restantes, atribuindo-os conforme a sobreposição geométrica entre malhas, permitindo para isso uma inconsistência de desenho entre as malhas correspondente à distância do buffer. Geometrias expúrias (razão área/perímetro baixa) são excluídas após interseção.

In [92]:
buffer_size = -5

In [93]:
# Selecionar aptos para sobreposição
### (usando todos exceto Manutenção, anteriormente filtrado para exceto Manutenção e Ajuste)
selecionados_A = gdf_A.copy()
selecionados_B = gdf_B.copy()

selecionados_A = selecionados_A.rename(columns={k:f'A.{k}' for k in selecionados_A.columns})
selecionados_B = selecionados_B.rename(columns={k:f'B.{k}' for k in selecionados_B.columns})

# Selecionar setores ainda não vinculados
list_isolados = list(nx.isolates(G_compat))

list_isolados_A =[i.split('.')[-1] for i in list_isolados if i.startswith('A.')]
isolados_A = gdf_A.query('CD_PERIMETRO in @list_isolados_A')
isolados_A = isolados_A.rename(columns={k:f'A.{k}' for k in isolados_A.columns})

list_isolados_B =[i.split('.')[-1] for i in list_isolados if i.startswith('B.')]
isolados_B = gdf_B.query('SSSQQQ in @list_isolados_B')
isolados_B = isolados_B.rename(columns={k:f'B.{k}' for k in isolados_B.columns})

In [94]:
'''# Isolados de A para B
isolados_A_buffer = isolados_A.copy()
isolados_A_buffer['A.geometry'] = isolados_A_buffer['A.geometry'].buffer(buffer_size)
isolados_A_buffer = isolados_A_buffer.set_geometry('A.geometry', crs=UTMCRS)
### Restituir geometria para setores pequenos (buffer isolados A)
for i, row in isolados_A_buffer.iterrows():
    if row['A.geometry'].is_empty:
        isolados_A_buffer.loc[i, 'A.geometry'] = isolados_A.loc[i, 'A.geometry']

buffered_B = selecionados_B.copy()
buffered_B['B.geometry'] = buffered_B['B.geometry'].buffer(buffer_size)
### Restituir geometria para setores pequenos (buffer B)
for i, row in buffered_B.iterrows():
    if row['B.geometry'].is_empty:
        buffered_B.loc[i, 'B.geometry'] = selecionados_B.loc[i, 'B.geometry']
### Restituir coluna de geometria
buffered_B = buffered_B.set_geometry('B.geometry', crs=UTMCRS)
### Interseção
intersecao = gpd.overlay(isolados_A_buffer, buffered_B, how='union')\
                .dropna(subset=['A.CD_PERIMETRO', 'B.SSSQQQ'])

### Remoção geometrias expúrias
intersecao = intersecao[intersecao['geometry'].apply(geomNotEspuria)]

### Adição das arestas
for i, row in intersecao.iterrows():
    G_compat.add_edge(f"A.{row['A.CD_PERIMETRO']}",
                      f"B.{row['B.SSSQQQ']}",
                      metodo='Buffer AB')'''

'# Isolados de A para B\nisolados_A_buffer = isolados_A.copy()\nisolados_A_buffer[\'A.geometry\'] = isolados_A_buffer[\'A.geometry\'].buffer(buffer_size)\nisolados_A_buffer = isolados_A_buffer.set_geometry(\'A.geometry\', crs=UTMCRS)\n### Restituir geometria para setores pequenos (buffer isolados A)\nfor i, row in isolados_A_buffer.iterrows():\n    if row[\'A.geometry\'].is_empty:\n        isolados_A_buffer.loc[i, \'A.geometry\'] = isolados_A.loc[i, \'A.geometry\']\n\nbuffered_B = selecionados_B.copy()\nbuffered_B[\'B.geometry\'] = buffered_B[\'B.geometry\'].buffer(buffer_size)\n### Restituir geometria para setores pequenos (buffer B)\nfor i, row in buffered_B.iterrows():\n    if row[\'B.geometry\'].is_empty:\n        buffered_B.loc[i, \'B.geometry\'] = selecionados_B.loc[i, \'B.geometry\']\n### Restituir coluna de geometria\nbuffered_B = buffered_B.set_geometry(\'B.geometry\', crs=UTMCRS)\n### Interseção\nintersecao = gpd.overlay(isolados_A_buffer, buffered_B, how=\'union\')          

In [95]:
# Isolados de B para A
isolados_B_buffer = isolados_B.copy()
isolados_B_buffer['B.geometry'] = isolados_B_buffer['B.geometry'].buffer(buffer_size)
isolados_B_buffer = isolados_B_buffer.set_geometry('B.geometry', crs=UTMCRS)
### Restituir geometria para setores pequenos (buffer isolados B)
for i, row in isolados_B_buffer.iterrows():
    if row['B.geometry'].is_empty:
        isolados_B_buffer.loc[i, 'B.geometry'] = isolados_B.loc[i, 'B.geometry']

buffered_A = selecionados_A.copy()
buffered_A['A.geometry'] = buffered_A['A.geometry'].buffer(buffer_size)
### Restituir geometria para setores pequenos
for i, row in buffered_A.iterrows():
    if row['A.geometry'].is_empty:
        buffered_A.loc[i, 'A.geometry'] = selecionados_A.loc[i, 'A.geometry']
### Restituir coluna de geometria
buffered_A = buffered_A.set_geometry('A.geometry', crs=UTMCRS)
### Interseção
intersecao = gpd.overlay(isolados_B_buffer, buffered_A, how='union')\
                .dropna(subset=['A.CD_PERIMETRO', 'B.SSSQQQ'])

### Remoção geometrias expúrias
intersecao = intersecao[intersecao['geometry'].apply(geomNotEspuria)]

### Adição das arestas
for i, row in intersecao.iterrows():
    G_compat.add_edge(f"A.{row['A.CD_PERIMETRO']}",
                      f"B.{row['B.SSSQQQ']}",
                      metodo='Buffer BA')

### Sobreposição forçada

In [96]:
'''# Intervalo de buffers até 0
buffer_sizes = [-5, 0]'''

'# Intervalo de buffers até 0\nbuffer_sizes = [-5, 0]'

In [97]:
'''# Selecionar aptos para sobreposição (todos)
selecionados_A = gdf_A.copy()
selecionados_B = gdf_B.copy()

selecionados_A = selecionados_A.rename(columns={k:f'A.{k}' for k in selecionados_A.columns})
selecionados_B = selecionados_B.rename(columns={k:f'B.{k}' for k in selecionados_B.columns})

# Configurar geodataframes
selecionados_A = selecionados_A.set_geometry('A.geometry', crs=UTMCRS)
selecionados_B = selecionados_B.set_geometry('B.geometry', crs=UTMCRS)'''

"# Selecionar aptos para sobreposição (todos)\nselecionados_A = gdf_A.copy()\nselecionados_B = gdf_B.copy()\n\nselecionados_A = selecionados_A.rename(columns={k:f'A.{k}' for k in selecionados_A.columns})\nselecionados_B = selecionados_B.rename(columns={k:f'B.{k}' for k in selecionados_B.columns})\n\n# Configurar geodataframes\nselecionados_A = selecionados_A.set_geometry('A.geometry', crs=UTMCRS)\nselecionados_B = selecionados_B.set_geometry('B.geometry', crs=UTMCRS)"

In [98]:
'''# Executa sobreposição com buffers crescentes até zero
for buffer_size in buffer_sizes:
    # Selecionar setores ainda não vinculados
    list_isolados = list(nx.isolates(G_compat))

    list_isolados_A = [i.split('.')[-1] for i in list_isolados if i.startswith('A.')]
    isolados_A = gdf_A.query('CD_PERIMETRO in @list_isolados_A')
    isolados_A = isolados_A.rename(columns={k:f'A.{k}' for k in isolados_A.columns})

    list_isolados_B =[i.split('.')[-1] for i in list_isolados if i.startswith('B.')]
    isolados_B = gdf_B.query('SSSQQQ in @list_isolados_B')
    isolados_B = isolados_B.rename(columns={k:f'B.{k}' for k in isolados_B.columns})

    # Configurar geodataframes
    isolados_A = isolados_A.set_geometry('A.geometry', crs=UTMCRS)
    isolados_B = isolados_B.set_geometry('B.geometry', crs=UTMCRS)

    # Buffers
    buffered_A = selecionados_A.copy()
    buffered_A['A.geometry'] = buffered_A['A.geometry'].buffer(buffer_size)
    ### Restituir geometria para setores pequenos
    for i, row in buffered_A.iterrows():
        if row['A.geometry'].is_empty:
            buffered_A.loc[i, 'A.geometry'] = selecionados_A.loc[i, 'A.geometry']
    ### Restituir coluna de geometria
    buffered_A = buffered_A.set_geometry('A.geometry', crs=UTMCRS)

    buffered_B = selecionados_B.copy()
    buffered_B['B.geometry'] = buffered_B['B.geometry'].buffer(buffer_size)
    ### Restituir geometria para setores pequenos (buffer B)
    for i, row in buffered_B.iterrows():
        if row['B.geometry'].is_empty:
            buffered_B.loc[i, 'B.geometry'] = selecionados_B.loc[i, 'B.geometry']
    ### Restituir coluna de geometria
    buffered_B = buffered_B.set_geometry('B.geometry', crs=UTMCRS)

    # Isolados de A para B
    ### Interseção
    intersecao = gpd.overlay(isolados_A, buffered_B, how='union')\
                    .dropna(subset=['A.CD_PERIMETRO', 'B.SSSQQQ'])

    ### Adição das arestas
    for i, row in intersecao.iterrows():
        G_compat.add_edge(f"A.{row['A.CD_PERIMETRO']}",
                        f"B.{row['B.SSSQQQ']}",
                        metodo='Forçado AB')

    # Isolados de B para A
    ### Interseção
    intersecao = gpd.overlay(isolados_B, buffered_A, how='union')\
                    .dropna(subset=['A.CD_PERIMETRO', 'B.SSSQQQ'])

    ### Adição das arestas
    for i, row in intersecao.iterrows():
        G_compat.add_edge(f"A.{row['A.CD_PERIMETRO']}",
                        f"B.{row['B.SSSQQQ']}",
                        metodo='Forçado BA')'''

'# Executa sobreposição com buffers crescentes até zero\nfor buffer_size in buffer_sizes:\n    # Selecionar setores ainda não vinculados\n    list_isolados = list(nx.isolates(G_compat))\n\n    list_isolados_A = [i.split(\'.\')[-1] for i in list_isolados if i.startswith(\'A.\')]\n    isolados_A = gdf_A.query(\'CD_PERIMETRO in @list_isolados_A\')\n    isolados_A = isolados_A.rename(columns={k:f\'A.{k}\' for k in isolados_A.columns})\n\n    list_isolados_B =[i.split(\'.\')[-1] for i in list_isolados if i.startswith(\'B.\')]\n    isolados_B = gdf_B.query(\'SSSQQQ in @list_isolados_B\')\n    isolados_B = isolados_B.rename(columns={k:f\'B.{k}\' for k in isolados_B.columns})\n\n    # Configurar geodataframes\n    isolados_A = isolados_A.set_geometry(\'A.geometry\', crs=UTMCRS)\n    isolados_B = isolados_B.set_geometry(\'B.geometry\', crs=UTMCRS)\n\n    # Buffers\n    buffered_A = selecionados_A.copy()\n    buffered_A[\'A.geometry\'] = buffered_A[\'A.geometry\'].buffer(buffer_size)\n    ### Re

### Assersão de compatibilização completa

In [99]:
try:
    assert not list(nx.isolates(G_compat))
except AssertionError:
    print(len(list(nx.isolates(G_compat))), 'nós não conectados')

293 nós não conectados


### Codificação das componentes do grafo de compatibilização

In [100]:
componentes = [{i+1:list(j)} for i, j in enumerate(nx.connected_components(G_compat))]

matriz_A = []
matriz_B = []
for comp in componentes:
    n, c = list(comp.items())[0]
    cod_c = f"{n:05d}"

    c_A = [G_compat.nodes[i]['nome'] for i in c if G_compat.nodes[i]['malha']=='A']
    matriz_A.append({'CD_PICSQ':cod_c, 'CD_PERIMETRO':c_A})
    c_B = [G_compat.nodes[i]['nome'] for i in c if G_compat.nodes[i]['malha']=='B']
    matriz_B.append({'CD_PICSQ':cod_c, 'SSSQQQ':c_B})

# Criação dos DataFrames finais
df_matriz_A = pd.DataFrame(matriz_A)
df_matriz_A = df_matriz_A.explode('CD_PERIMETRO')
df_matriz_B = pd.DataFrame(matriz_B)
df_matriz_B = df_matriz_B.explode('SSSQQQ')

### Exportação de matrizes de compatibilidade

In [106]:
df_matriz_A[['CD_PERIMETRO', 'CD_PICSQ']].dropna().to_csv(f'{nome_compat}/matriz_compat_{nome_A}.csv', sep='\t', index=False)
df_matriz_B[['SSSQQQ', 'CD_PICSQ']].dropna().to_csv(f'{nome_compat}/matriz_compat_{nome_B}.csv', sep='\t', index=False)

In [107]:
# Contagem de membros A dos perímetros
data_matriz_A = df_matriz_A.pivot_table(index='CD_PICSQ',
                                        values='CD_PERIMETRO',
                                        aggfunc='count').reset_index()
data_matriz_A = data_matriz_A.rename(columns={'CD_PERIMETRO':'membros_A'})
# Contagem de membros B dos perímetros
data_matriz_B = df_matriz_B.pivot_table(index='CD_PICSQ',
                                        values='SSSQQQ',
                                        aggfunc='count').reset_index()
data_matriz_B = data_matriz_B.rename(columns={'SSSQQQ':'membros_B'})
# Agregação dos dados
data_matrizes = data_matriz_A.merge(data_matriz_B, on='CD_PICSQ')
data_matrizes['membros'] = data_matrizes['membros_A'] + data_matrizes['membros_B']

### Geopackage de perímetros compatíveis

In [111]:
def removeHoles(geom, area_min=1):
    if isinstance(geom, Polygon):
        geom = MultiPolygon([geom])
    out_polys = []
    for part in geom.geoms:
        interiors = []
        for i in part.interiors:
            p = Polygon(i)
            if p.area > area_min:
                interiors.append(i)
        out_polys.append(Polygon(part.exterior.coords, holes=interiors))
    return MultiPolygon(out_polys) if len(out_polys)>1 else out_polys[0]

gdf_perim_compat = df_matriz_A.merge(gdf_A, on='CD_PERIMETRO')
gdf_perim_compat = gdf_perim_compat.merge(data_matrizes, on='CD_PICSQ')
gdf_perim_compat = gpd.GeoDataFrame(gdf_perim_compat, geometry='geometry', crs=UTMCRS)
gdf_perim_compat = gdf_perim_compat[['CD_PICSQ', 'membros', 'membros_A', 'membros_B', 'geometry']].dissolve(by='CD_PICSQ')
gdf_perim_compat['geometry'] = gdf_perim_compat['geometry'].apply(removeHoles)
gdf_perim_compat.to_file(f'{nome_compat}/perimetros_compativeis.gpkg',
                         layer=f'{nome_A}-{nome_B}-A',
                         driver='GPKG')

gdf_perim_compat = df_matriz_B.merge(gdf_B, on='SSSQQQ')
gdf_perim_compat = gdf_perim_compat.merge(data_matrizes, on='CD_PICSQ')
gdf_perim_compat = gpd.GeoDataFrame(gdf_perim_compat, geometry='geometry', crs=UTMCRS)
gdf_perim_compat = gdf_perim_compat[['CD_PICSQ', 'membros', 'membros_A', 'membros_B', 'geometry']].dissolve(by='CD_PICSQ')
gdf_perim_compat['geometry'] = gdf_perim_compat['geometry'].apply(removeHoles)
gdf_perim_compat.to_file(f'{nome_compat}/perimetros_compativeis.gpkg',
                         layer=f'{nome_A}-{nome_B}-B',
                         driver='GPKG')

### Exportação de geopackage com grafo de compatibilização

In [112]:
# Arestas
edge_data = []
for u, v in list(G_compat.edges):
    data_u = G_compat.nodes[u]
    data_u = {f"{data_u['malha']}.{k}":value for k, value in data_u.items()}
    data_v = G_compat.nodes[v]
    data_v = {f"{data_v['malha']}.{k}":value for k, value in data_v.items()}
    data_u.update(data_v)
    data_u.update(G_compat.edges[(u, v)])
    data_u['geometry'] = LineString([data_u['A.center'], data_u['B.center']])
    edge_data.append(data_u)

edge_gdf = gpd.GeoDataFrame(edge_data, geometry='geometry', crs=UTMCRS)
edge_gdf = edge_gdf[['A.nome', 'B.nome', 'metodo', 'geometry']]
edge_gdf.to_file(f'{nome_compat}/grafo_compatibilizacao.gpkg',
                    layer=f'{nome_A}-{nome_B}_edges',
                    driver='GPKG')

# Nós
node_data = [i for _, i in list(G_compat.nodes.data())]
for k in node_data:
    k['grau'] = len(G_compat[f"{k['malha']}.{k['nome']}"])
node_gdf = gpd.GeoDataFrame(node_data, geometry='center', crs=UTMCRS)
node_gdf = node_gdf[['nome', 'malha', 'grau', 'center']]
node_gdf.to_file(f'{nome_compat}/grafo_compatibilizacao.gpkg',
                    layer=f'{nome_A}-{nome_B}_nodes',
                    driver='GPKG')