In [1]:
import geopandas as gpd
import json

In [2]:
import asyncio
import logging
import sys

import httpx
import requests
from requests.exceptions import RequestException
from tenacity import before_sleep_log, retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential

In [3]:
from json import JSONDecodeError
import xmltodict

In [4]:
from requests.exceptions import HTTPError

class ResponseNotJson(HTTPError):
    '''Raised when the response is not a JSON'''

class ResponseNotXML(HTTPError):
    '''Raised when the response is not a XML'''

class PaginationError(HTTPError):
    '''Raised hen total retrieved features is not equal to total features returned by API'''

In [5]:
def raise_for_status(func):
    '''Raises HTTP error for response status codes 4xx or 5xx.
    If not, returns content of response'''

    def decorated(*args, **kwargs):

        response = func(*args, **kwargs)
        response.raise_for_status()
        content = response.content

        return content

    return decorated


def json_response(func):
    '''Parses json response. Raises xml string error if
    xml is returned'''

    def decorated(*args, **kwargs):

        response = func(*args, **kwargs)
        json_txt = response.decode('utf-8')
        try:
            json_data = json.loads(json_txt)
            return json_data
        except JSONDecodeError:
            response = xmltodict.parse(response)
            raise ResponseNotJson(f'Response is not a JSON: {response}')
    
    return decorated

def xml_response(func):
    '''Parses xml response as a dict'''

    def decorated(*args, **kwargs):
        
        try:
            response = func(*args, **kwargs)
            parsed = xmltodict.parse(response)
            return parsed
        except Exception as e:
            raise ResponseNotXML(f'XML parsing failed {e}')
    
    return decorated

In [6]:
logger = logging.getLogger(__name__)

class BaseClient:
    """WFS base client - used to make generic requests"""

    accepted_versions = {"2.0.0"}
    output_formats = {"json", "xml", "bytes"}

    def __init__(self, domain: str, version: str = "2.0.0", verbose: bool = True):
        self.domain = self.__clean_domain(domain)
        self.version = self.__assert_version(version)
        self.host = self.__gen_host()
        self.verbose = verbose

    def __clean_domain(self, domain: str) -> str:
        if domain.endswith(r"/"):
            domain = domain[:-1]

        return domain

    def __assert_version(self, version: str) -> str:
        if version not in self.accepted_versions:
            raise ValueError(f"Accepted versions: {self.accepted_versions}")
        return version

    def __gen_host(self) -> str:
        wfs_version = f"/?service=WFS&version={self.version}"
        return self.domain + wfs_version

    def __solve_get_params(self, *ignore, **query_params: dict) -> str:
        request_args = ["=".join([param_name, str(param_value)]) for param_name, param_value in query_params.items()]
        query_string = "&".join(request_args)

        return query_string

    def __solve_req_capability(self, capability: str) -> str:
        capability_param = f"request={capability}"

        return capability_param

    def __solve_request_url(self, capability: str, **query_params: dict) -> str:
        base_url = self.host
        capability_param = self.__solve_req_capability(capability)
        url = base_url + "&" + capability_param

        if query_params is None:
            return url
        else:
            req_params = self.__solve_get_params(**query_params)
            return url + "&" + req_params

    @retry(
        retry=retry_if_exception_type(RequestException),
        stop=stop_after_attempt(10),
        wait=wait_random_exponential(5, min=5, max=60),
        before_sleep=before_sleep_log(logger, logging.INFO, exc_info=True),
    )
    @raise_for_status
    def wfs_generic_request(self, capability: str, *ignore, **query_params: dict) -> bytes:
        url = self.__solve_request_url(capability, **query_params)

        # response is in bytes, so must be decoed accordingly
        with requests.get(url) as response:
            return response

    async def wfs_async_requests(self, capability: str, pages, **query_params: dict):
        params = query_params.copy()
        async with httpx.AsyncClient() as client:

            @retry(
                retry=retry_if_exception_type(httpx.HTTPError),
                stop=stop_after_attempt(10),
                wait=wait_random_exponential(30, min=30, max=300),
                before_sleep=before_sleep_log(logger, logging.INFO, exc_info=True),
            )
            async def fetch(url, semaphore):
                async with semaphore:
                    response = await client.get(url, timeout=300)
                    response.raise_for_status()
                    return response

            tasks = []
            semaphore = asyncio.Semaphore(4)

            for page in pages:
                params["startIndex"] = page
                url = self.__solve_request_url(capability, **params)
                task = asyncio.create_task(fetch(url, semaphore))
                tasks.append(task)

            responses = await asyncio.gather(*tasks, return_exceptions=True)

        for response in responses:
            if isinstance(response, Exception):
                # Handle exceptions
                print(f"Request failed: {response}")

        @json_response
        def parse_response(response):
            return response.content

        responses = [parse_response(response) for response in responses]

        first_response = responses[0]
        for response in responses[1:]:
            first_response["features"].extend(response["features"])

        return first_response

    @json_response
    def get_json(self, capability: str, **query_params: dict) -> dict:
        return self.wfs_generic_request(
            capability, outputFormat="application/json", exceptions="application/json", **query_params
        )

    @xml_response
    def get_xml(self, capability: str, **query_params: dict) -> dict:
        return self.wfs_generic_request(capability, **query_params)

    def __call__(self, capability: str, *ignore, output_format="json", pages=None, **query_params: dict) -> bytes:
        if output_format not in self.output_formats:
            raise ValueError(f"output_format must be in {self.output_formats}")

        if output_format == "json":
            return self.get_json(capability, **query_params)

        elif output_format == "xml":
            return self.get_xml(capability, **query_params)

        else:
            return self.wfs_generic_request(capability, **query_params)


In [7]:
from typing import Union


class CQLFilter:
    '''Builds a CQL filter for querying the WFS server'''
    
    def __init__(self, feature_name:str, schema:dict)->None:
        
        self.feature_name = feature_name
        self.schema = schema[feature_name]
        self.cql_filter = ''
        
    def add_to_filter(self, query_str:str, sep:str=';')->None:
        
        if len(self.cql_filter)<1:
            self.cql_filter=query_str
        else:
            query_str = sep + query_str
            self.cql_filter += query_str
    
    def __check_propertie_in_schema(self, prop_name:str)->None:
        
        if prop_name not in self.schema:
            raise ValueError(f'Feature name {prop_name} must be in {self.schema.keys()}')
        
    def __propertie_equals(self, propertie_name:str, equals_to:Union[str, int, float]):
        
        self.__check_propertie_in_schema(propertie_name)
        if type(equals_to) is str:
            equals_to = f"'{equals_to}'"

        query_str = f"({propertie_name}={equals_to})"
        
        return query_str
    
    def properties_equals(self, *ignore, **propertie_comparisons):
        
        #se tiver mais de um filtro, separa corretamente
        self.add_to_filter('', sep=';')
        for prop_name, prop_val in propertie_comparisons.items():
            query_str = self.__propertie_equals(prop_name, prop_val)
            self.add_to_filter(query_str, sep='AND')

    def point_within_pol(self, x:float, y:float, precision:int=5)->dict:
        '''Precision is in meters. Returns the polygon in self.feature_name 
        which intersects a buffer of {precision} meters centralized at the point'''

        query = f'DWITHIN(ge_poligono,POINT({x} {y}),{precision},meters)'
        self.add_to_filter(query)

    def point_within_linha(self, x:float, y:float, precision:int=5)->dict:
        '''Precision is in meters. Returns the polygon in self.feature_name 
        which intersects a buffer of {precision} meters centralized at the point'''

        query = f'DWITHIN(ge_linha,POINT({x} {y}),{precision},meters)'
        self.add_to_filter(query)

    def point_within_multipol(self, x:float, y:float, precision:int=5)->dict:
        '''Precision is in meters. Returns the polygon in self.feature_name 
        which intersects a buffer of {precision} meters centralized at the point'''

        query = f'DWITHIN(ge_multipoligono,POINT({x} {y}),{precision},meters)'
        self.add_to_filter(query)

    def polygon_within_pol(self, coordinates:str, precision:int=5)->dict:
        '''Precision is in meters. Returns the polygon in self.feature_name 
        which intersects a buffer of {precision} meters centralized at the point'''

        query = f'DWITHIN(ge_poligono,POLYGON({coordinates}),{precision},meters)'
        self.add_to_filter(query)

    def polygon_within_linha(self, coordinates:str, precision:int=5)->dict:
        '''Precision is in meters. Returns the polygon in self.feature_name 
        which intersects a buffer of {precision} meters centralized at the point'''

        query = f'DWITHIN(ge_linha,POLYGON({coordinates}),{precision},meters)'
        self.add_to_filter(query)

    def polygon_within_multipol(self, coordinates:str, precision:int=5)->dict:
        '''Precision is in meters. Returns the polygon in self.feature_name 
        which intersects a buffer of {precision} meters centralized at the point'''

        query = f'DWITHIN(ge_multipoligono,POLYGON({coordinates}),{precision},meters)'
        self.add_to_filter(query)


    def __call__(self):
        
        return self.cql_filter

In [8]:
import warnings

In [9]:
class Paginator:
    def __init__(self, get_function: BaseClient, schemas: dict = None) -> None:
        self.schemas = schemas or {}
        self.get = get_function

    def extract_total_features(self, resp: dict) -> int:
        return resp["totalFeatures"]

    def extract_returned_quantity(self, resp: dict) -> int:
        return len(resp["features"])

    def needs_pagination(self, total_features: int, returned_quantity: int) -> bool:
        return total_features > returned_quantity

    def get_steps(self, total_features: int, returned_quantity: int) -> list:
        # need to start at zero because first query wans't ordered
        return [step for step in range(0, total_features, returned_quantity)]

    def warn_steps(self, feature_name: str, steps: list) -> None:
        total_steps = len(steps)

        warnings.warn(f"Paginação iniciada. Serão realizadas {total_steps} requisições para a {feature_name}")

    def get_index_col(self, feature_name: str, index_col: str = None) -> str:
        if index_col:
            warnings.warn(f"Certifique-se que a index_col de fato é uma coluna existente na feature")
            return index_col

        if not self.schemas:
            raise ValueError(f"Must set schemas if willing to paginate and not specify index col")

        feature_schemas = self.schemas.get(feature_name, None)
        if feature_schemas is None:
            raise ValueError(f"Schema for feature {feature_name} not found. Must specify index col")

        index_col = feature_schemas.get("id_col")
        if index_col is None:
            raise ValueError(f"Feature {feature_name} has no defaul index col. Must specify index col")

        return index_col

    def paginate(self, feature_name: str, resp: dict, index_col: str = None, *_ignored, **query_params) -> dict:
        total_features = self.extract_total_features(resp)
        returned_quantity = self.extract_returned_quantity(resp)

        if not self.needs_pagination(total_features, returned_quantity):
            return resp

        print("Paginação iniciada")
        index_col = self.get_index_col(feature_name, index_col)

        steps = self.get_steps(total_features, returned_quantity)
        self.warn_steps(feature_name, steps)

        # recriando as features
        resp["features"] = []

        for _ in steps:
            resp_step = self.get("GetFeature", typeName=feature_name, sortBy=index_col, **query_params)
            features_step = resp_step["features"]
            resp["features"].extend(features_step)
            max_index = features_step[-1]["properties"][index_col]
            query_params["cql_filter"] = f"{index_col}>{max_index}"

        total_returned_features = len(resp["features"])
        if not total_returned_features == total_features:
            raise PaginationError(
                f"""Difference in total features and paginated features: 
                          total - {total_features} vs returned - {total_returned_features}
                          """
            )

        return resp

    def __call__(self, feature_name: str, resp: dict, index_col: str = None, *_ignored, **query_params) -> dict:
        return self.paginate(feature_name, resp, index_col, **query_params)


In [10]:
import warnings
from collections import OrderedDict

class FeatureMdataParser:
    '''Parses feature metadata'''

    def parse_property(self, prop:dict)->dict:

        name = prop['name']
        parsed = dict(
            nullable = prop['nillable'],
            dtype = prop['localType']
            )

        return {name : parsed}
    
    def get_cd_identificador(self, prop:dict, cd_col_name:str='cd_identifica')->bool:

        name = prop['name']
        if name.lower().startswith(cd_col_name):
            return True
        return False
    
    def set_cd_identificador(self, properties:dict, prop:dict)->None:

        if self.get_cd_identificador(prop) and \
            not properties.get('id_col'):

            properties['id_col'] = prop['name']

    def parse_feature_schema(self, feat:dict)->dict:

        name = feat['typeName']
        properties = OrderedDict()
        for prop in feat['properties']:
            parsed_prop = self.parse_property(prop)
            properties.update(parsed_prop)

            self.set_cd_identificador(properties, prop)

        parsed ={
            name : properties
        }

        return parsed
    
    def raise_for_no_id(self, parsed_feature:dict)->None:

        for feature_name, mdata in parsed_feature.items():
            if 'id_col' not in mdata:
                warnings.warn(f"Could not identify id col for feature {feature_name}")

    def parse_all_features(self, get_feature_resp:dict)->dict:

        features = get_feature_resp['featureTypes']

        parsed = OrderedDict()
        for feat in features:
            parsed_feat = self.parse_feature_schema(feat)
            parsed.update(parsed_feat)
            self.raise_for_no_id(parsed_feat)
        return parsed

    def __call__(self, get_feature_resp:dict)->dict:

        return self.parse_all_features(get_feature_resp)

In [11]:
class GeoSampaWfs(BaseClient):

    def __init__(self, domain:str, set_schemas:bool=True, verbose:bool=True, auto_paginate:bool=True):

        self.get = BaseClient(domain=domain, verbose=verbose)
        self.parse_features_schemas = FeatureMdataParser()

        self.schemas = self.get_feature_schemas() if set_schemas else None
        self.paginate = Paginator(self.get, self.schemas)

        self.auto_paginate = auto_paginate
    
    def __list_feature_types_raw(self):

        return self.get('DescribeFeatureType')

    def get_feature_schemas(self):

        resp  = self.__list_feature_types_raw()
        return self.parse_features_schemas(resp)

    def __solve_properties(self, query_params, properties:list=None):

         if properties:
            names = ','.join(properties)
            query_params['propertyName']=names

    def __check_feature_exists(self, feature_name:str)->None:

        if self.schemas and feature_name not in self.schemas:
            raise ValueError(f"Feature name {feature_name} doesn't exits")

    def get_feature(self, feature_name:str, properties:list=None, filter:CQLFilter=None, paginate:bool=None,
                    index_col:str=None, **query_params):

        self.__check_feature_exists(feature_name)        
        self.__solve_properties(query_params, properties)
       
        if filter is not None:
            query_params['cql_filter'] = filter()

        resp = self.get('GetFeature', typeName=feature_name, **query_params)

        if paginate or (paginate is None and self.auto_paginate):
            return self.paginate(feature_name, resp, index_col, **query_params)

In [12]:
import os
import shutil
import warnings

from dotenv import load_dotenv

In [13]:
#ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(os.path.abspath(''))
print (ROOT_DIR)

F:\General Projects\Git\GitHub\sepep-pmsp\saade_governancadados


In [14]:
class MissingEnvironmentVariable(RuntimeError):
    pass

def get_dotenv_path(root: str = ROOT_DIR, example: bool = False) -> str:
    """Returns .env file path"""

    path = os.path.join(root, ".env")

    if example:
        path += ".example"

    return path


def dotenv_exits(dotenv_path: str = None, root: str = ROOT_DIR):
    """Check if .env file path exists"""

    dotenv_path = dotenv_path or get_dotenv_path(root)

    return os.path.exists(dotenv_path)


def solve_dot_env(root: str = ROOT_DIR) -> str:
    """Copies .env.example as .env if .env doesnt exists"""

    dotenv_path = get_dotenv_path(root)

    if not dotenv_exits(dotenv_path):
        warnings.warn(".env file not found: copying .env.example")
        env_example = get_dotenv_path(root, example=True)
        shutil.copy(env_example, dotenv_path)

    return dotenv_path


def load_env(root: str = ROOT_DIR) -> None:
    """Loads .env. If .env doesn't exists, will save .env.example
    as .env and will load it's variables"""

    env_path = solve_dot_env(root)
    load_dotenv(dotenv_path=env_path)


def load_var(varname: str, root: str = ROOT_DIR) -> str:
    load_env(root)

    try:
        return os.environ[varname]
    except KeyError:
        raise MissingEnvironmentVariable(f"Environment var {varname} not defined")


In [15]:
GEOSAMPA_WFS_DOMAIN = load_var('GEOSAMPA_WFS_DOMAIN')

def get_client(domain=GEOSAMPA_WFS_DOMAIN):
    print (GEOSAMPA_WFS_DOMAIN)

    return GeoSampaWfs(domain)

In [16]:
geosampa = get_client()

http://wfs.geosampa.prefeitura.sp.gov.br/geoserver/ows




In [17]:
schemas = geosampa.get_feature_schemas()



In [18]:
# garantir que todas as camadas configuradas atualmente ainda existem no geosampa

camadas_atuais = [
    'GEOSAMPA_cadparcs_area_protecao_apa'
    ,'area_contaminada_reabilitada_svma'
    ,'patrimonio_cultural_area_envoltoria_CONDEPHAAT'
    ,'patrimonio_cultural_area_envoltoria_CONPRESP'
    ,'patrimonio_cultural_area_envoltoria_IPHAN'
    ,'patrimonio_cultural_lugar_paisagistico_ambiental'
    ,'calcada'
    ,'classificacao_viaria_cet'
    ,'decreto_utilidade_publica_interesse_social'
    ,'operacao_urbana'
    ,'GEOSAMPA_cadparcs_parque_estadual'
    ,'GEOSAMPA_cadparcs_parque_outra_secretaria'
    ,'requalifica_centro_perimetro_especial'
    ,'requalifica_centro_perimetro_geral'
    ,'area_risco_geologico'
    ,'GEOSAMPA_cadparcs_reserva_particular_natural'
    ,'patrimonio_cultural_sitio_arqueologico'
    ,'termo_compromisso_ambiental_svma'
    ,'GEOSAMPA_terra_indigena'
    ,'patrimonio_cultural_bem_tombado'
    ,'GEOSAMPA_cadparcs_unidade_conservacao_existente'
    ,'zoneamento_2016_map1'
    ,'restricao_geotecnica'
    ,'remanescente_pmma'
    ,'manancial_billings'
    ,'manancial_guarapiranga'
    ,'manancial_juquery'
    ,'restricao_mirante_santana'
    ,'linha_alta_tensao'
    ,'transpetro_duto'
    ,'geoconvias_faixa_nao_edificavel'
    ,'geoconvias_lei_melhoramento_vigente'
    ,'rampa_heliponto_licenciado'
    ,'distrito_municipal'
    ,'subprefeitura'
    ,'distrito_municipal'
]

In [19]:
valida_camadas_existentes = { cam_existente : (cam_existente in schemas.keys())  for cam_existente in camadas_atuais}
print("\n".join([k for k, v in valida_camadas_existentes.items() if not v]))

GEOSAMPA_cadparcs_parque_estadual
GEOSAMPA_cadparcs_parque_outra_secretaria
GEOSAMPA_cadparcs_reserva_particular_natural
GEOSAMPA_cadparcs_unidade_conservacao_existente
rampa_heliponto_licenciado


In [20]:
existe_lines = []
dict_layer_id_col = {}
dict_layer_schema = {}

existe = False
empty_id_cols = 0
result_lines = []
result = ''

for camada in schemas.keys():
    i = 0
    existe = camada in camadas_atuais
            
    id_col = ''
    if 'id_col' in schemas[camada]:
        dict_layer_id_col[camada] = (schemas[camada])['id_col']
        id_col = (schemas[camada])['id_col']
    else:
        dict_layer_id_col[camada] = None
        print(f"{camada} - Não tem atributo 'id_col' !!!")
        empty_id_cols = empty_id_cols + 1

    for col in schemas[camada].keys():
        if col == 'id_col':
            continue
            
        key = 'Sim' if col == id_col else 'Não'        
        prop_dict = (schemas[camada])[col]
        result = '\t'.join([camada, col, key, ('Sim' if prop_dict['nullable'] else 'Não'), prop_dict['dtype']])
        if existe:
            existe_lines.append(result)
        result_lines.append((result)) 
        
print(f"Total keys: {len(dict_layer_id_col)} - Empty keys: {empty_id_cols}")

geohabisp_vw_wfs_cortico_geosampa - Não tem atributo 'id_col' !!!
geohabisp_vw_wfs_favela_geosampa - Não tem atributo 'id_col' !!!
geohabisp_vw_wfs_loteamento_geosampa - Não tem atributo 'id_col' !!!
geohabisp_vw_wfs_nucleo_geosampa - Não tem atributo 'id_col' !!!
Total keys: 395 - Empty keys: 4


In [21]:
import datetime
dh_arquivo = f"{datetime.datetime.now():%Y%m%d_%H%M}"

log_fname = f"{ROOT_DIR}/output/{dh_arquivo}_schema_keys.txt"
with open(log_fname, 'w') as sk:
    sk.write(json.dumps(dict_layer_id_col))

log_fname = f"{ROOT_DIR}/output/{dh_arquivo}_schema_geosampa.txt"
with open(log_fname, 'w') as sg:
    sg.write('\n'.join(result_lines))

log_fname = f"{ROOT_DIR}/output/{dh_arquivo}_schema.txt"
with open(log_fname, 'w') as s:
    s.write('\n'.join(existe_lines))

In [22]:
print(f"Processamento concluído: {datetime.datetime.now():%d/%M</%Y %H:%m}")

Processamento concluído: 30/41</2024 18:08
