In [None]:
#default_exp parser
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Parser
> Funções principais de Leitura e Processamento dos dados provenientes dos Bancos de Dados

In [None]:
#export
from pathlib import Path
import re
import xml.etree.ElementTree as et
from zipfile import ZipFile
from datetime import datetime
import collections
import pandas as pd
import pandas_read_xml as pdx
from anateldb.query import *

In [None]:
#export
def read_stel(pasta, update=False):
    """Lê o banco de dados salvo localmente do STEL. Opcionalmente o atualiza pelo Banco de Dados ANATELBDRO01."""
    if update:
        update_stel(pasta)
    if (file := Path(f'{pasta}/stel.fth')).exists():
        stel = pd.read_feather(file)
    elif (file := Path(f'{pasta}/stel.csv')).exists():
        stel = pd.read_csv(file)
    elif (file := Path(f'{pasta}/Base_de_Dados.xlsx')).exists():
        stel = pd.read_excel(file, sheet_name='Stel', engine='openpyxl')
    else:
        update_stel(pasta)
        try:
            stel = pd.read_feather(Path(f'{pasta}/stel.fth'))
        except FileNotFoundError as e:
            raise ConnectionError("Base de Dados do Stel inexistente e não foi possível atualizá-la" ) from e
    return stel

In [None]:
#export
def read_radcom(pasta, update=False):
    """Lê o banco de dados salvo localmente de RADCOM. Opcionalmente o atualiza pelo Banco de Dados ANATELBDRO01."""
    if update:
        update_radcom(pasta)
    if (file := Path(f'{pasta}/radcom.fth')).exists():
        radcom = pd.read_feather(file)
    elif (file := Path(f'{pasta}/radcom.csv')).exists():
        radcom = pd.read_csv(file)
    elif (file := Path(f'{pasta}/Base_de_Dados.xlsx')).exists():
         radcom = pd.read_excel(file, sheet_name='Radcom', engine='openpyxl')
    else:
        update_radcom(pasta)
        try:
            radcom = pd.read_feather(Path(f'{pasta}/radcom.fth'))
        except FileNotFoundError as e:
            raise ConnectionError("Base de Dados do Stel inexistente e não foi possível atualizá-la" ) from e
    return radcom

In [None]:
#export
def row2dict(row):
    """Receives a json row and return the dictionary from it"""
    #dict_result = {k: row[k] for k in cols}
    #dict_result['Número da Estação'] = row['administrativo']['@numero_estacao']
    return {k:v for k,v in row.items()}

def dict2cols(df, reject=()):
    """Recebe um dataframe com dicionários nas células e extrai os dicionários como colunas
       Opcionalmente ignora e exclue as colunas em reject
       """    
    for column in df.columns:
        if column in reject:
            df.drop(column, axis=1, inplace=True)
            continue
        if type(df[column].iloc[0]) == collections.OrderedDict:
            try:
                new_df = pd.DataFrame(df[column].apply(row2dict).tolist())
                df = pd.concat([df, new_df], axis=1)
                df.drop(column, axis=1, inplace=True)
            except AttributeError:
                continue
    return df

def parse_plano_basico(row, cols=COL_PB):
    """Receives a json row and filter the column in `cols`"""
    return {k: row[k] for k in cols}

def read_estações(path):
    df = pdx.read_xml(path, ['estacao_rd'])
    df = pd.DataFrame(df['row'].apply(row2dict).tolist()).replace('', pd.NA)
    df = dict2cols(df)
    df = df.loc[:, COL_ESTACOES]
    df.columns = NEW_ESTACOES
    return df
    #df['Número da Estação'] = row['administrativo']['@numero_estacao']

def read_plano_basico(path):
    df = pdx.read_xml(path, ['plano_basico'])
    df = pd.DataFrame(df['row'].apply(row2dict).tolist()).replace('', pd.NA)
    df = dict2cols(df, REJECT_ESTACOES)
    df.loc[df['@Frequência'].isna(), '@Frequência'] = df.loc[df['@Frequência'].isna(), '@Frequencia']
    df.drop('@Frequencia', axis=1, inplace=True)
    df.loc[df['@País'].isna(), '@País'] = df.loc[df['@País'].isna(), '@Pais']
    df.drop('@Pais', axis=1, inplace=True)
    df = df.loc[:, COL_PB]
    df.columns = NEW_PB
    return df
#     df['@Longitude'] = df['@Longitude'].str.replace(',', '.').astype('float')
#     df['@Latitude'] = df['@Latitude'].str.replace(',', '.').astype('float')
#     df['@Frequência'] = df['@Frequência'].str.replace(',', '.').astype('float')
    

def read_historico(path):
    regex = r"\s([a-zA-Z]+)=\'{1}([\w\-\ :\.]*)\'{1}"
    with ZipFile(path) as xmlzip:
        with xmlzip.open('documento_historicos.xml', 'r') as xml:
            xml_list = xml.read().decode().split('\n')[2:-1]
    dict_list = []
    for item in xml_list:
        matches = re.finditer(regex, item, re.MULTILINE)
        dict_list.append(dict(match.groups() for match in matches))
    df = pd.DataFrame(dict_list)
    df = df[(df.tipoDocumento == 'Ato') & (df.razao == 'Autoriza o Uso de Radiofrequência')].reset_index()
    df = df.loc[:, ['id', 'numeroDocumento', 'orgao', 'dataDocumento']]
    df.columns = ['Id', 'Num_Ato', 'Órgao', 'Data_Ato']
    df['Data_Ato'] = pd.to_datetime(df.Data_Ato)    
    return df.sort_values('Data_Ato').groupby('Id').last().reset_index()

In [None]:
#export
def read_mosaico(pasta, update=False):
    if update:
        update_mosaico(pasta)
        estações = read_estações(f'{pasta}/estações.zip', ['estacao_rd'])
        plano_basico = read_plano_basico(f'{pasta}/plano_basico.zip', ['plano_basico'])
        historico = read_historico(f'{pasta}/historico.zip')
        df = pd.merge(estações, plano_basico, on='Id').merge(historico, on='Id')
        df.to_feather(f'{pasta}/mosaico.fth')
        return df
    if not (file := Path(f'{pasta}/mosaico.fth')).exists():
            return read_mosaico(pasta, update=True)
    return pd.read_feather(f'{pasta}/mosaico.fth')

In [None]:
from nbdev.export import notebook2script; notebook2script()

Converted eda.ipynb.
Converted filter.ipynb.
Converted index.ipynb.
Converted parser.ipynb.
Converted queries.ipynb.
Converted tests.ipynb.
