In [0]:
%pip install polars==1.0.0
%pip install duckdb==1.2.1
%pip install odfpy

Python interpreter will be restarted.
Collecting polars==1.0.0
  Downloading polars-1.0.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (31.0 MB)
Installing collected packages: polars
Successfully installed polars-1.0.0
Python interpreter will be restarted.
Python interpreter will be restarted.
Collecting duckdb==1.2.1
  Downloading duckdb-1.2.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (20.2 MB)
Installing collected packages: duckdb
Successfully installed duckdb-1.2.1
Python interpreter will be restarted.
Python interpreter will be restarted.
Collecting odfpy
  Downloading odfpy-1.4.1.tar.gz (717 kB)
Building wheels for collected packages: odfpy
  Building wheel for odfpy (setup.py): started
  Building wheel for odfpy (setup.py): finished with status 'done'
  Created wheel for odfpy: filename=odfpy-1.4.1-py2.py3-none-any.whl size=160693 sha256=e682041b3659ebdd2d9e834af52190de12ece0fef4a3b798e04ed99381237e89
  Stored in directory: /root/.cache/pip/wheels/20/ff

Configurações iniciais de módulos, diretórios e URLs com os dados que deverão ser coletados para posterior rotina de dados.

Os dados serão utilizados de repositórios públicos da **Anatel** e **IBGE**

In [0]:
import polars as pl
import pandas as pd
import duckdb as db
from duckdb.typing import *
import os, shutil, sys
from pathlib import Path 
from zipfile import ZipFile
import subprocess as sub
import numpy as np
from datetime import *
from dateutil.relativedelta import relativedelta
import unicodedata
import re

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry


STG_PATH = '/_temp'
DATABASE = os.path.join(STG_PATH, 'database')
TMP_FILES = os.path.join(STG_PATH, 'tmp_files')

if not os.path.exists(STG_PATH):
    os.mkdir(STG_PATH)

URL_RQUAL_IND = 'https://www.anatel.gov.br/dadosabertos/paineis_de_dados/qualidade/indicadores_rqual.zip'
URL_ESTACOES_SMP = 'https://www.anatel.gov.br/dadosabertos/paineis_de_dados/outorga_e_licenciamento/estacoes_smp.zip'
URL_IBGE_MUNICIPIOS = 'https://geoftp.ibge.gov.br/organizacao_do_territorio/estrutura_territorial/divisao_territorial/2023/DTB_2023.zip'
URL_ANATEL_AREAS_LOCAIS = 'https://www.anatel.gov.br/dadosabertos/paineis_de_dados/areastarifarias/areaslocais.zip'



Funções auxiliares que serão utilizadas para ajudar na normalização e transformação dos dados.

In [0]:
#Leitura do valor em Bytes para "HumanReadable"
def sizeof_fmt(num, suffix="B"):
    for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"):
        if abs(num) < 1024.0:
            return f"{num:3.1f}{unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f}Yi{suffix}"

#Coleta Dados URL
def get_file_url(TMP_FILES, URL_NAME, proxy=False):
    name_file = os.path.basename(URL_NAME)
    file_path = os.path.join(TMP_FILES,name_file)
    r = requests.get(URL_NAME, stream=True, verify=False, proxies=(proxy_dict if proxy else {}))
    if r.ok:
        print("saving to", os.path.abspath(file_path))
        with open(file_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024 * 8):
                if chunk:
                    f.write(chunk)
                    f.flush()
                    os.fsync(f.fileno())
    else:  # HTTP status code 4XX/5XX
        print("Download failed: status code {}\n{}".format(r.status_code, r.text))


# Funções auxiliares para limpar nomes de colunas
def remove_accents(text):
    return unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII')

def only_alphanum(text):
    return re.sub(r'[^a-zA-Z0-9 ]', ' ', text)

def non_alphanum_underscore(text):
    return re.sub(r'\s+', '_', text.strip())

def dashes2underscore(text):
    return re.sub(r'[-]', '_', text.strip())


for v_paths in [DATABASE, TMP_FILES]:
    if os.path.exists(v_paths):
        shutil.rmtree(v_paths)
        os.mkdir(v_paths)
    else:
        os.mkdir(v_paths)

#definição do apontamento e diretório onde serão armazenados o metadados do duckdb
Path(DATABASE).mkdir(parents=True, exist_ok=True)
db_file_path = os.path.join(DATABASE, "db_portalanatel_ind_rqual.duckdb")

Obtendo os dados pelas suas respectivas URLs de acesso

In [0]:
get_file_url(TMP_FILES, URL_RQUAL_IND)
get_file_url(TMP_FILES, URL_ESTACOES_SMP)
get_file_url(TMP_FILES, URL_IBGE_MUNICIPIOS)
get_file_url(TMP_FILES, URL_ANATEL_AREAS_LOCAIS)

saving to /_temp/tmp_files/indicadores_rqual.zip
saving to /_temp/tmp_files/estacoes_smp.zip
saving to /_temp/tmp_files/DTB_2023.zip
saving to /_temp/tmp_files/areaslocais.zip


Transformação e pré-processamento dos dados de múnicipios brasileiros obtidos pelo **IBGE**

In [0]:
#Arquivos Base Municípios IBGE
filename_ibge = os.path.join(TMP_FILES,os.path.basename(URL_IBGE_MUNICIPIOS))
with ZipFile(filename_ibge, 'r') as zip_ref:
    source = zip_ref.open('DTB_2023/RELATORIO_DTB_BRASIL_MUNICIPIO.ods')
    target = open(os.path.join(TMP_FILES, 'RELATORIO_DTB_BRASIL_MUNICIPIO.ods'), "wb")
    with source, target:
            shutil.copyfileobj(source, target)
df_dtb_municipio = pd.read_excel(os.path.join(TMP_FILES,"RELATORIO_DTB_BRASIL_MUNICIPIO.ods"), skiprows=6)
df_dtb_municipio.head()

#Normaliza nome das Colunas encontradas
tmp_cols = []
for col in df_dtb_municipio.columns.to_list():
    tmp_cols.append(non_alphanum_underscore(only_alphanum(remove_accents(col.upper()))))
df_dtb_municipio.columns = tmp_cols; del tmp_cols

conn = db.connect(db_file_path)
conn.sql("DROP TABLE IF EXISTS DTB_IBGE_MUNICIPIOS")
conn.sql("CREATE TABLE DTB_IBGE_MUNICIPIOS AS SELECT * FROM df_dtb_municipio")
conn.close()

Transformação e pré-processamento dos dados disponibilizados pela **Anatel**


In [0]:
#Arquivos Anatel - Códigos de Areas Locais
filename_cn = os.path.join(TMP_FILES,os.path.basename(URL_ANATEL_AREAS_LOCAIS))

with ZipFile(filename_cn, 'r') as zip_ref:
    source = zip_ref.open('CODIGOS_NACIONAIS_PGCN.csv')
    target = open(os.path.join(TMP_FILES, 'CODIGOS_NACIONAIS_PGCN.csv'), "wb")
    with source, target:
            shutil.copyfileobj(source, target)
df_arealocal = pd.read_csv(os.path.join(TMP_FILES,"CODIGOS_NACIONAIS_PGCN.csv"), sep=';')
df_arealocal.head()

#Normaliza nome das Colunas encontradas
tmp_cols = []
for col in df_arealocal.columns.to_list():
    tmp_cols.append(dashes2underscore(non_alphanum_underscore(only_alphanum(remove_accents(col.upper())))))
df_arealocal.columns = tmp_cols; del tmp_cols

conn = db.connect(db_file_path)
conn.sql("DROP TABLE IF EXISTS AREALOCAL")
conn.sql("CREATE TABLE AREALOCAL AS SELECT * FROM df_arealocal")
conn.close()

In [0]:
#Arquivos Base Anatel - Indicadores
filename_indicadores = os.path.join(TMP_FILES,os.path.basename(URL_RQUAL_IND))

#descompacta
ZipFile(filename_indicadores, 'r').extractall(TMP_FILES)

#trata_indicadores
csv_file = os.path.normpath(os.path.join(TMP_FILES, ZipFile(filename_indicadores).namelist()[0]))
parquet_file = dashes2underscore(os.path.normpath(os.path.join(TMP_FILES, csv_file.replace('.csv','.parquet'))))
print(f'Tratando o arquivo {os.path.basename(csv_file)} | tamanho do arquivo {sizeof_fmt(os.path.getsize(csv_file))};')

df_schema = pl.scan_csv(csv_file, separator=';',
                        encoding='utf8', has_header=True,
                        infer_schema_length= 10000, null_values=['NÃO IDENTIFICADA'],
                        n_rows=100000).collect_schema()

#Normaliza nome das Colunas encontradas
tmp_dict = {}
for k, v in df_schema.items():
    tmp_dict[non_alphanum_underscore(only_alphanum(remove_accents(k))).upper()] = df_schema[k]
df_schema = tmp_dict.copy(); del tmp_dict

#Realiza leitura do arquivo CSV via lazyFrame com Polars e converte para arquivo Parquet
pl.scan_csv(csv_file, separator=';',
            encoding='utf8', has_header=True,
            infer_schema_length= 10000, null_values=['NÃO IDENTIFICADA'],
            schema=df_schema).sink_parquet(
                                        parquet_file,
                                        compression="zstd",
                                        row_group_size=100_000
                                        )
print(f'Arquivo parquet gerado {os.path.basename(parquet_file)} | tamanho do arquivo {sizeof_fmt(os.path.getsize(parquet_file))};')

# Carregando o dataset para Transformações no DuckDB
conn = db.connect(db_file_path)
tbl_name = non_alphanum_underscore(os.path.splitext(os.path.basename(parquet_file))[0].upper())
conn.execute(f"DROP TABLE IF EXISTS {tbl_name}")
conn.execute(f"CREATE TABLE {tbl_name} AS SELECT * FROM parquet_scan('{parquet_file}')")
conn.close()

Tratando o arquivo Tabela_CSV_Indicadores_RQUAL.csv | tamanho do arquivo 1.8GiB;
Arquivo parquet gerado Tabela_CSV_Indicadores_RQUAL.parquet | tamanho do arquivo 159.6MiB;


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [0]:
#Arquivos Base Anatel - Estações Serviço Móvel - ERBs
filename_base_sites = os.path.join(TMP_FILES,os.path.basename(URL_ESTACOES_SMP))

#descompacta
ZipFile(filename_base_sites, 'r').extractall(TMP_FILES)

#trata_indicadores
csv_file = os.path.normpath(os.path.join(TMP_FILES, ZipFile(filename_base_sites).namelist()[0]))
parquet_file = dashes2underscore(os.path.normpath(os.path.join(TMP_FILES, csv_file.replace('.csv','.parquet'))))
print(f'Tratando o arquivo {os.path.basename(csv_file)} | tamanho do arquivo {sizeof_fmt(os.path.getsize(csv_file))};')

df_schema = pl.scan_csv(csv_file, separator=';',
                        encoding='utf8', has_header=True,
                        infer_schema_length= 100000, null_values=['NÃO IDENTIFICADA','#N/A',','],
                        n_rows=100000).collect_schema()

#Normaliza nome das Colunas encontradas
tmp_dict = {}
for k, v in df_schema.items():
    tmp_dict[non_alphanum_underscore(only_alphanum(remove_accents(k))).upper()] = df_schema[k]
df_schema = tmp_dict.copy(); del tmp_dict

#Realiza leitura do arquivo CSV via lazyFrame com Polars e converte para arquivo Parquet
pl.scan_csv(csv_file, separator=';',
            encoding='utf8', has_header=True,
            infer_schema_length= 10000, null_values=['NÃO IDENTIFICADA','#N/A',','],
            schema=df_schema).sink_parquet(
                                        parquet_file,
                                        compression="zstd",
                                        row_group_size=100_000
                                        )
print(f'Arquivo parquet gerado {os.path.basename(parquet_file)} | tamanho do arquivo {sizeof_fmt(os.path.getsize(parquet_file))};')

# Carregando o dataset para Transformações no DuckDB
conn = db.connect(db_file_path)
tbl_name = non_alphanum_underscore(os.path.splitext(os.path.basename(parquet_file))[0].upper())
conn.execute(f"DROP TABLE IF EXISTS {tbl_name}")
conn.execute(f"CREATE TABLE {tbl_name} AS SELECT * FROM parquet_scan('{parquet_file}')")
conn.close()

Tratando o arquivo Estacoes_SMP.csv | tamanho do arquivo 314.2MiB;
Arquivo parquet gerado Estacoes_SMP.parquet | tamanho do arquivo 29.1MiB;


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [0]:
#Remove os arquivos que não serão utilizados para ingestão, aliviando espaço em disco do servidor
for file in os.listdir(TMP_FILES):
    if file.endswith((".csv",".pdf",".ods")):
        os.remove(os.path.join(TMP_FILES, file))

Processamento via **duckdb** para ajustes pontuais em arquivos.

Optei por utilizar o duckdb como uma camada _Bronze_ , tendo os dados parquet transformado anteriormente em sua camada analítica SQL.


In [0]:
conn = db.connect(db_file_path)

v_sql_smp = '''
DROP VIEW IF EXISTS V_RQUAL_ANATEL_SMP;

CREATE OR REPLACE VIEW V_RQUAL_ANATEL_SMP as
SELECT
 try_cast(MESDATPER as INTEGER) MESDATPER, ANO, MES, REPLACE(SERVICO, 'Telefonia Móvel', 'SMP') SERVICO, PRESTADORA, CODIGO_IBGE, UF, NOME_DO_MUNICIPIO,
 max("IND1_max(RESULTADO)") IND1,
 max("IND2_max(RESULTADO)") IND2,
 max("IND3_max(RESULTADO)") IND3,
 max("IND4_max(RESULTADO)") IND4,
 max("IND5_max(RESULTADO)") IND5,
 max("IND6_max(RESULTADO)") IND6,
 max("IND7_max(RESULTADO)") IND7,
 max("IND8_max(RESULTADO)") IND8,
 least(max("IND4_MAX(NUMERO_MEDIDAS)"), max("IND5_MAX(NUMERO_MEDIDAS)"), max("IND6_MAX(NUMERO_MEDIDAS)"), max("IND7_MAX(NUMERO_MEDIDAS)")) NUMERO_MEDIDAS,
 least(max("IND4_MAX(NUMERO_COLETORES)"), max("IND5_MAX(NUMERO_COLETORES)"), max("IND6_MAX(NUMERO_COLETORES)"), max("IND7_MAX(NUMERO_COLETORES)")) NUMERO_COLETORES,
 case when least(max("IND4_MAX(NUMERO_MEDIDAS)"), max("IND5_MAX(NUMERO_MEDIDAS)"), max("IND6_MAX(NUMERO_MEDIDAS)"), max("IND7_MAX(NUMERO_MEDIDAS)")) >= 109 then 1 else 0 end VALIDADE_ESTATISTICA
from
(
	select
	try_cast(i.ANO as varchar)||case when length(try_cast(i.MES as varchar))=1 then '0'||try_cast(i.MES as varchar) else try_cast(i.MES as varchar) end MESDATPER,
	i.ANO,
	i.MES,
	i.SERVICO,
	i.PRESTADORA,
	i.UF,
	i.MUNICIPIO NOME_DO_MUNICIPIO,
	i.CODIGO_IBGE,
	i.INDICADOR IND,
	i.RESULTADO,
	i.LIMITE_INFERIOR LIM_INFERIOR,
	i.LIMITE_SUPERIOR LIM_SUPERIOR,
	i.ERRO_AMOSTRAL,
	i.NUMERO_DE_MEDIDAS NUMERO_MEDIDAS,
	i.NUMERO_DE_COLETORES NUMERO_COLETORES
	from
	TABELA_CSV_INDICADORES_RQUAL i 
	where i.SERVICO = 'Telefonia Móvel'
	and i.TIPO = 'Indicador IQS'
)
pivot(
    MAX(RESULTADO),
    MAX(NUMERO_MEDIDAS),
    MAX(NUMERO_COLETORES)
    for ind in
    (
       'IND1' IND1,
       'IND2' IND2,
       'IND3' IND3,
       'IND4' IND4,
       'IND5' IND5,
       'IND6' IND6,
       'IND7' IND7,
       'IND8' IND8
    )
)
 group by MESDATPER, ANO, MES, SERVICO, PRESTADORA, CODIGO_IBGE, UF, NOME_DO_MUNICIPIO
 order by MESDATPER, UF, CODIGO_IBGE
;
'''

conn.execute(v_sql_smp)

v_tbl_smp = '''
select MESDATPER, ANO, MES, REPLACE(SERVICO, 'Telefonia Móvel', 'SMP') SERVICO, PRESTADORA, CODIGO_IBGE, UF, NOME_DO_MUNICIPIO,
IND1, IND2, IND3, IND4, IND5, IND6, IND7, IND8, NUMERO_MEDIDAS, NUMERO_COLETORES, VALIDADE_ESTATISTICA
from V_RQUAL_ANATEL_SMP
'''

df_rqual_smp = conn.execute(v_tbl_smp).df()
df_estacoes_smp = conn.execute('select * from ESTACOES_SMP').df()

df_ibge = conn.execute('select * from DTB_IBGE_MUNICIPIOS').df()
df_arealocal = conn.execute('select * from AREALOCAL').df()

conn.close()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Criação da camada _Silver_ no Spark e realizando a ingestão dos dados;

Estamos definindo em sequencia duas Tabelas como "Fatos" e duas dimensões (IBGE e Area Local)

In [0]:
%sql 
DROP DATABASE IF EXISTS l_silver CASCADE;
CREATE DATABASE l_silver;

In [0]:
#Cria tabela F_RQUAL_SMP
rqual_smp_spark_df = spark.createDataFrame(df_rqual_smp)
rqual_smp_spark_df.write.mode("overwrite").saveAsTable("l_silver.f_rqual_smp")

#Cria tabela F_ESTACOES_SMP
estacoes_smp_spark_df = spark.createDataFrame(df_estacoes_smp)
estacoes_smp_spark_df.write.mode("overwrite").saveAsTable("l_silver.f_estacoes_smp")

#Cria tabela D_IBGE
df_ibge_subset = df_ibge[['CODIGO_MUNICIPIO_COMPLETO', 'NOME_MUNICIPIO', 'UF', 'NOME_UF']].copy()
df_ibge_subset.columns = ['COD_IBGE', 'MUNICIPIO', 'COD_UF', 'UF']
ibge_spark_df = spark.createDataFrame(df_ibge_subset)
ibge_spark_df.write.mode("overwrite").saveAsTable("l_silver.d_ibge")

#Cria tabela D_AREALOCAL
df_arealocal_subset = df_arealocal[['CO_MUNICIPIO_IBGE', 'CN']].copy()
df_arealocal_subset.columns = ['COD_IBGE', 'CN']
arealocal_spark_df = spark.createDataFrame(df_arealocal_subset)
arealocal_spark_df.write.mode("overwrite").saveAsTable("l_silver.d_arealocal")

Criação da camada _Gold_  no Spark e realizando a ingestão dos dados;

Estamos definindo em sequencia duas Tabelas como "Fatos" e duas dimensões (IBGE e Area Local)

In [0]:
%sql 
DROP DATABASE IF EXISTS l_gold CASCADE;
CREATE DATABASE l_gold;

Criação da view Erbs_BR - Resultado consolidado de ERBs a nível nacional

In [0]:
%sql 
/* Criar visualização do resultado em agregação a nível País - Brasil */
create or replace view l_gold.vw_erbs_br as
  select *
  from
  (
    SELECT PRESTADORA,
                  TCN,
                  SUM(QTD_ESTAC) AS ERBS
            FROM
              (SELECT DISTINCT
                  upper(bs.EMPRESA_ESTACAO) PRESTADORA,
                  count(distinct bs.numero_estacao) QTD_ESTAC,
                  bs.geracao TCN,
                  al.CN COD_AREA,
                  ib.COD_UF,
                  ib.UF,
                  ib.MUNICIPIO CIDADE,
                  ib.COD_IBGE
                  FROM l_silver.f_estacoes_smp bs
                    join l_silver.d_ibge ib on bs.CODIGO_IBGE = ib.COD_IBGE
                    left join l_silver.d_arealocal al on bs.CODIGO_IBGE = al.COD_IBGE
                  where bs.geracao is not null
                  group by
                  bs.EMPRESA_ESTACAO, bs.GERACAO, al.CN, ib.COD_UF, ib.UF, ib.MUNICIPIO, ib.COD_IBGE
                )
          group by PRESTADORA, TCN
  )
  pivot(
    max(erbs) AS qtd_erbs
        FOR tcn IN ('2G' AS TCN_2G, '3G' AS TCN_3G, '4G' AS TCN_4G, '5G' AS TCN_5G)
      )
;

In [0]:
%sql
select * from l_gold.vw_erbs_br
limit 10;

PRESTADORA,TCN_2G,TCN_3G,TCN_4G,TCN_5G
IEZ! TELECOM LTDA.,,,11.0,11.0
VIVO,16324.0,26935.0,34212.0,16065.0
SERCOMTEL,38.0,42.0,,
GIGA+,,,12.0,
ALGAR,398.0,615.0,557.0,185.0
TIM,18165.0,19868.0,30809.0,12720.0
UNIFIQUE TELECOMUNICACOES S/A,,,160.0,140.0
BRISANET,,,1778.0,1506.0
CLARO,19653.0,24767.0,28039.0,11741.0


Criação da view Erbs_UF - Resultado consolidado de ERBs a nível de unidades federativas (estados da união)

In [0]:
%sql 
/* Criar visualização do resultado em agregação a nível UF - Brasil */
create or replace view l_gold.vw_erbs_uf as
  select *
  from
  (
    SELECT 
          UF, COD_UF, COD_AREA,
          PRESTADORA,
          TCN,
          SUM(QTD_ESTAC) AS ERBS
            FROM
              (SELECT DISTINCT
                  upper(bs.EMPRESA_ESTACAO) PRESTADORA,
                  count(distinct bs.numero_estacao) QTD_ESTAC,
                  bs.geracao TCN,
                  al.CN COD_AREA,
                  ib.COD_UF,
                  ib.UF,
                  ib.MUNICIPIO CIDADE,
                  ib.COD_IBGE
                  FROM l_silver.f_estacoes_smp bs
                    join l_silver.d_ibge ib on bs.CODIGO_IBGE = ib.COD_IBGE
                    left join l_silver.d_arealocal al on bs.CODIGO_IBGE = al.COD_IBGE
                  where bs.geracao is not null
                  group by
                  bs.EMPRESA_ESTACAO, bs.GERACAO, al.CN, ib.COD_UF, ib.UF, ib.MUNICIPIO, ib.COD_IBGE
                )
          group by 
          UF, COD_UF, COD_AREA,
          PRESTADORA, TCN
  )
  pivot(
    max(erbs) AS qtd_erbs
        FOR tcn IN ('2G' AS TCN_2G, '3G' AS TCN_3G, '4G' AS TCN_4G, '5G' AS TCN_5G)
      )
;

In [0]:
%sql
select * from l_gold.vw_erbs_uf
limit 10;

UF,COD_UF,COD_AREA,PRESTADORA,TCN_2G,TCN_3G,TCN_4G,TCN_5G
Minas Gerais,31,37,VIVO,134.0,233.0,283,60
Minas Gerais,31,32,CLARO,238.0,260.0,311,83
Minas Gerais,31,34,ALGAR,252.0,360.0,348,138
Rio Grande do Sul,43,51,TIM,532.0,527.0,1069,263
Acre,12,68,VIVO,54.0,89.0,106,60
Goiás,52,61,VIVO,56.0,77.0,141,63
Bahia,29,73,TIM,107.0,157.0,297,38
Paraná,41,42,TIM,134.0,245.0,305,75
Santa Catarina,42,49,UNIFIQUE TELECOMUNICACOES S/A,,,10,10
Goiás,52,61,CLARO,102.0,147.0,149,45


Criação da view Erbs_Cid - Resultado consolidado de ERBs a nível dos municípios das unidades federativas (Cidades)

In [0]:
%sql 
/* Criar visualização do resultado em agregação a nível Cidades - Brasil */
create or replace view l_gold.vw_erbs_cid as
  select *
  from
  (
    SELECT 
          UF, COD_UF, COD_AREA,
          COD_IBGE, CIDADE, CAPITAL,
          PRESTADORA,
          TCN,
          SUM(QTD_ESTAC) AS ERBS
            FROM
              (SELECT DISTINCT
                  upper(bs.EMPRESA_ESTACAO) PRESTADORA,
                  count(distinct bs.numero_estacao) QTD_ESTAC,
                  bs.geracao TCN,
                  al.CN COD_AREA,
                  ib.COD_UF,
                  ib.UF,
                  ib.MUNICIPIO CIDADE,
                  ib.COD_IBGE,
                  case
                  when ib.COD_IBGE in (
                    1100205,1302603,1200401,5002704,1600303,
                    5300108,1400100,5103403,1721000,3550308,
                    2211001,3304557,1501402,5208707,2927408,
                    4205407,2111300,2704302,4314902,4106902,
                    3106200,2304400,2611606,2507507,2800308,
                    2408102,3205309
                  ) then
                  'SIM' else 'NÃO' end as CAPITAL
                  FROM l_silver.f_estacoes_smp bs
                    join l_silver.d_ibge ib on bs.CODIGO_IBGE = ib.COD_IBGE
                    left join l_silver.d_arealocal al on bs.CODIGO_IBGE = al.COD_IBGE
                  where bs.geracao is not null
                  group by
                  bs.EMPRESA_ESTACAO, bs.GERACAO, al.CN, ib.COD_UF, ib.UF, ib.MUNICIPIO, ib.COD_IBGE, CAPITAL
                )
          group by 
          UF, COD_UF, COD_AREA,
          COD_IBGE, CIDADE, CAPITAL,
          PRESTADORA, TCN
  )
  pivot(
    max(erbs) AS qtd_erbs
        FOR tcn IN ('2G' AS TCN_2G, '3G' AS TCN_3G, '4G' AS TCN_4G, '5G' AS TCN_5G)
      )
;

In [0]:
%sql
select * from l_gold.vw_erbs_cid
limit 10;

UF,COD_UF,COD_AREA,COD_IBGE,CIDADE,CAPITAL,PRESTADORA,TCN_2G,TCN_3G,TCN_4G,TCN_5G
Goiás,52,64,5205901,Corumbaíba,NÃO,CLARO,1.0,1.0,1,
Minas Gerais,31,31,3118106,Congonhas do Norte,NÃO,CLARO,1.0,1.0,1,
Maranhão,21,98,2107506,Paço do Lumiar,NÃO,BRISANET,,,1,1.0
São Paulo,35,19,3509502,Campinas,NÃO,TIM,99.0,168.0,221,163.0
Bahia,29,77,2906105,Canápolis,NÃO,VIVO,1.0,1.0,1,
Minas Gerais,31,35,3132909,Itamogi,NÃO,VIVO,1.0,1.0,1,
Piauí,22,86,2205508,José de Freitas,NÃO,VIVO,2.0,2.0,1,
Rio de Janeiro,33,21,3302700,Maricá,NÃO,TIM,17.0,19.0,23,11.0
Paraná,41,45,4110953,Itaipulândia,NÃO,TIM,3.0,4.0,4,
Minas Gerais,31,33,3139508,Manhumirim,NÃO,CLARO,1.0,1.0,1,


Criação da view Indicadores - Resultado do Plano de Melhoria de Qualidade divulgado pela Anatel
Estaremos utilizando um recorte do período a partir do último semestre de 2024 em diante.

In [0]:
%sql
create or replace table l_gold.vw_indicadores as
select 
MESDATPER, 
ind.ANO, ind.MES, ind.SERVICO,
al.CN, ib.COD_IBGE, ib.UF, ib.COD_UF, ib.MUNICIPIO CIDADE, 
ind.IND1, ind.IND2, ind.IND3, ind.IND4, ind.IND5, ind.IND6, ind.IND7, ind.IND8,
ind.NUMERO_MEDIDAS, ind.NUMERO_COLETORES, ind.VALIDADE_ESTATISTICA
    FROM
        l_silver.f_rqual_smp ind
        join l_silver.d_ibge ib on ind.CODIGO_IBGE = ib.COD_IBGE
        left join l_silver.d_arealocal al on ind.CODIGO_IBGE = al.COD_IBGE
where mesdatper >= 202407
;

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from l_gold.vw_indicadores
where mesdatper >= 202407
limit 10;

MESDATPER,ANO,MES,SERVICO,CN,COD_IBGE,UF,COD_UF,CIDADE,IND1,IND2,IND3,IND4,IND5,IND6,IND7,IND8,NUMERO_MEDIDAS,NUMERO_COLETORES,VALIDADE_ESTATISTICA
202409,2024,9,SMP,81,2612901,Pernambuco,26,São Benedito do Sul,998646,2129,994055,962962,955357,1000000,982456,992083,108,43,0
202409,2024,9,SMP,81,2612901,Pernambuco,26,São Benedito do Sul,991936,8186,997317,898148,931034,913793,931034,997754,54,32,0
202409,2024,9,SMP,81,2613008,Pernambuco,26,São Bento do Una,997864,1249,998686,895626,859525,863013,935276,1000000,5584,883,1
202409,2024,9,SMP,81,2613008,Pernambuco,26,São Bento do Una,996558,3329,998705,848356,946473,998110,900948,1000000,1534,301,1
202409,2024,9,SMP,81,2613008,Pernambuco,26,São Bento do Una,998101,2501,998383,751263,879543,845275,944151,1000000,882,237,1
202409,2024,9,SMP,81,2613107,Pernambuco,26,São Caitano,996959,5282,999023,931364,951798,991583,920553,991678,1253,540,1
202409,2024,9,SMP,81,2613107,Pernambuco,26,São Caitano,998430,2428,998697,918481,783738,844852,881503,1000000,12838,1844,1
202409,2024,9,SMP,81,2613107,Pernambuco,26,São Caitano,994030,6194,988191,803088,880382,866028,923645,1000000,387,212,1
202409,2024,9,SMP,87,2613206,Pernambuco,26,São João,998337,2338,999081,924534,969289,996803,895050,1000000,1451,328,1
202409,2024,9,SMP,87,2613206,Pernambuco,26,São João,998423,1025,998145,969746,934693,916734,957551,1000000,1220,244,1
