In [1]:
!curl ifconfig.me

35.247.249.65

In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from functools import reduce
import numpy as np
import re

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--jars gs://scriptsnatalsoul/postgresql-42.3.1.jar pyspark-shell'


In [3]:
spark = SparkSession.builder.appName('InserindoMysql')\
.config('spark.sql.caseSensitive','True')\
.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
url = '34.134.31.72'
db = 'telecomunicacoes'
port = '5432'
user = 'postgres'
password = 'Ox95F1eyft7LPBeN'

In [5]:
def loadData(table:str):
        df = spark.read \
        .format("jdbc") \
        .option("numPartitions", "4") \
        .option("fetchsize", "50000") \
        .option("url", f"jdbc:postgresql://{url}:{port}/{db}") \
        .option("dbtable", table) \
        .option("driver", "org.postgresql.Driver") \
        .option("user", user) \
        .option("password", password) \
        .load()
        return df

## --> 1. Tratamento de **dados_ibge**

In [6]:
dados_ibge = loadData('dados_ibge')
dados_ibge.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- id_municipio: integer (nullable = true)
 |-- pib: long (nullable = true)
 |-- impostos_liquidos: long (nullable = true)
 |-- va: long (nullable = true)
 |-- va_agropecuaria: long (nullable = true)
 |-- va_industria: long (nullable = true)
 |-- va_servicos: long (nullable = true)
 |-- va_adespss: long (nullable = true)



In [7]:
print('numero de entradas ->',dados_ibge.count())
dados_ibge.select([count(when(isnan(c),c)).alias(c)for c in dados_ibge.columns]).show()

                                                                                

numero de entradas -> 94616


[Stage 3:>                                                          (0 + 1) / 1]

+---+------------+---+-----------------+---+---------------+------------+-----------+----------+
|ano|id_municipio|pib|impostos_liquidos| va|va_agropecuaria|va_industria|va_servicos|va_adespss|
+---+------------+---+-----------------+---+---------------+------------+-----------+----------+
|  0|           0|  0|                0|  0|              0|           0|          0|         0|
+---+------------+---+-----------------+---+---------------+------------+-----------+----------+



                                                                                

In [8]:
dados_ibge.show(3)

[Stage 6:>                                                          (0 + 1) / 1]

+----+------------+---------+-----------------+---------+---------------+------------+-----------+----------+
| ano|id_municipio|      pib|impostos_liquidos|       va|va_agropecuaria|va_industria|va_servicos|va_adespss|
+----+------------+---------+-----------------+---------+---------------+------------+-----------+----------+
|2002|     1100015|111290995|          7549266|103741729|       27013223|     9376871|   24651113|  42700523|
|2003|     1100015|143222381|         10511613|132710768|       41079095|    12700905|   32975861|  45954907|
|2004|     1100015|173990790|         12219047|161771743|       41413428|    29963081|   39089592|  51305642|
+----+------------+---------+-----------------+---------+---------------+------------+-----------+----------+
only showing top 3 rows



                                                                                

In [9]:
DFdi = dados_ibge.select(col('ano'),
                        col('id_municipio').alias('id_ibge'),
                        regexp_replace('id_municipio', r"(^.{1,2})" , "").alias('id_municipio').cast(IntegerType()),
                        substring('id_municipio',0,2).alias('id_uf').cast(IntegerType()),
                        col('pib').alias('pib_municipio')).filter( (col('ano') == 2017) | (col('ano') == 2018) )
print('numero de entradas ->',DFdi.count())
DFdi.show(3)

                                                                                

numero de entradas -> 11140


[Stage 10:>                                                         (0 + 1) / 1]

+----+-------+------------+-----+-------------+
| ano|id_ibge|id_municipio|id_uf|pib_municipio|
+----+-------+------------+-----+-------------+
|2017|1100015|          15|   11|    485374332|
|2018|1100015|          15|   11|    499305982|
|2017|1100023|          23|   11|   2287910379|
+----+-------+------------+-----+-------------+
only showing top 3 rows



                                                                                

In [10]:
DFdi.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- id_ibge: integer (nullable = true)
 |-- id_municipio: integer (nullable = true)
 |-- id_uf: integer (nullable = true)
 |-- pib_municipio: long (nullable = true)



## --> 2. Tratamento de **cidades_ibge**

In [11]:
cidades_ibge = loadData('cidades_ibge')
cidades_ibge.printSchema()

root
 |-- UF: string (nullable = true)
 |-- COD__UF: integer (nullable = true)
 |-- COD__MUNIC: integer (nullable = true)
 |-- NOME_DO_MUNIC_PIO: string (nullable = true)
 |-- _POPULA__O_ESTIMADA_: string (nullable = true)



In [12]:
print('numero de entradas ->',cidades_ibge.count())
cidades_ibge.select([count(when(isnan(c),c)).alias(c)for c in cidades_ibge.columns]).show()

                                                                                

numero de entradas -> 5571


[Stage 14:>                                                         (0 + 1) / 1]

+---+-------+----------+-----------------+--------------------+
| UF|COD__UF|COD__MUNIC|NOME_DO_MUNIC_PIO|_POPULA__O_ESTIMADA_|
+---+-------+----------+-----------------+--------------------+
|  0|      0|         0|                0|                   0|
+---+-------+----------+-----------------+--------------------+



                                                                                

In [13]:
cidades_ibge = cidades_ibge.dropna()
cidades_ibge.count()

                                                                                

5570

In [14]:
cidades_ibge = cidades_ibge.withColumn("COD__UF", (col('COD__UF')*100000))
cidades_ibge = cidades_ibge.withColumn('id_ibge', (col('COD__UF')+col('COD__MUNIC')))
DFci = cidades_ibge
DFci.show(3)

[Stage 20:>                                                         (0 + 1) / 1]

+---+-------+----------+--------------------+--------------------+-------+
| UF|COD__UF|COD__MUNIC|   NOME_DO_MUNIC_PIO|_POPULA__O_ESTIMADA_|id_ibge|
+---+-------+----------+--------------------+--------------------+-------+
| RO|1100000|        15|Alta Floresta D'O...|              22.516|1100015|
| RO|1100000|        23|           Ariquemes|             111.148|1100023|
| RO|1100000|        31|              Cabixi|               5.067|1100031|
+---+-------+----------+--------------------+--------------------+-------+
only showing top 3 rows



                                                                                

## --> 3. Fusão das tabelas tratadas **cidades_ibge** e **dados_ibge**

In [15]:
print(' ',DFdi.columns,'\n ',DFci.columns)

  ['ano', 'id_ibge', 'id_municipio', 'id_uf', 'pib_municipio'] 
  ['UF', 'COD__UF', 'COD__MUNIC', 'NOME_DO_MUNIC_PIO', '_POPULA__O_ESTIMADA_', 'id_ibge']


In [16]:
DFjoin = DFci.join(DFdi, ['id_ibge'], 'inner').drop(*['COD__MUNIC', 'COD__UF'])
DFjoin.count()

                                                                                

11140

In [17]:
DFjoin.show(3)

                                                                                

+-------+---+-----------------+--------------------+----+------------+-----+-------------+
|id_ibge| UF|NOME_DO_MUNIC_PIO|_POPULA__O_ESTIMADA_| ano|id_municipio|id_uf|pib_municipio|
+-------+---+-----------------+--------------------+----+------------+-----+-------------+
|1707405| TO|      Esperantina|              11.280|2018|        7405|   17|     89104956|
|1707405| TO|      Esperantina|              11.280|2017|        7405|   17|     85646671|
|1715705| TO|      Palmeirante|               6.234|2018|       15705|   17|     95872968|
+-------+---+-----------------+--------------------+----+------------+-----+-------------+
only showing top 3 rows



In [18]:
print('numero de entradas ->',DFjoin.count())
DFjoin.select([count(when(isnan(c),c)).alias(c)for c in DFjoin.columns]).show()

                                                                                

numero de entradas -> 11140


                                                                                

+-------+---+-----------------+--------------------+---+------------+-----+-------------+
|id_ibge| UF|NOME_DO_MUNIC_PIO|_POPULA__O_ESTIMADA_|ano|id_municipio|id_uf|pib_municipio|
+-------+---+-----------------+--------------------+---+------------+-----+-------------+
|      0|  0|                0|                   0|  0|           0|    0|            0|
+-------+---+-----------------+--------------------+---+------------+-----+-------------+



In [19]:
DFjoin.printSchema()

root
 |-- id_ibge: integer (nullable = true)
 |-- UF: string (nullable = true)
 |-- NOME_DO_MUNIC_PIO: string (nullable = true)
 |-- _POPULA__O_ESTIMADA_: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- id_municipio: integer (nullable = true)
 |-- id_uf: integer (nullable = true)
 |-- pib_municipio: long (nullable = true)



In [20]:
DFjoin.show(3)

[Stage 54:>                 (0 + 1) / 1][Stage 55:>                 (0 + 1) / 1]

+-------+---+-----------------+--------------------+----+------------+-----+-------------+
|id_ibge| UF|NOME_DO_MUNIC_PIO|_POPULA__O_ESTIMADA_| ano|id_municipio|id_uf|pib_municipio|
+-------+---+-----------------+--------------------+----+------------+-----+-------------+
|3522158| SP|           Itaoca|               3.332|2017|       22158|   35|     39835016|
|3522158| SP|           Itaoca|               3.332|2018|       22158|   35|     41298592|
|1707405| TO|      Esperantina|              11.280|2017|        7405|   17|     85646671|
+-------+---+-----------------+--------------------+----+------------+-----+-------------+
only showing top 3 rows



                                                                                

In [21]:
DFjoin = DFjoin.select(DFjoin[0].alias('cod_ibge'),
                DFjoin[1].alias('uf'),
                DFjoin[2].alias('nome_municipio'),
                regexp_replace(DFjoin[3], "\." , "").alias('populacao').cast(IntegerType()),                       
                DFjoin[4],
                DFjoin[5].alias('cod_municipio'),
                DFjoin[6].alias('cod_uf'),
                DFjoin[7].alias('pib_municipio'))

In [22]:
DFjoin.show(3)

                                                                                

+--------+---+--------------+---------+----+-------------+------+-------------+
|cod_ibge| uf|nome_municipio|populacao| ano|cod_municipio|cod_uf|pib_municipio|
+--------+---+--------------+---------+----+-------------+------+-------------+
| 3522158| SP|        Itaoca|     3332|2017|        22158|    35|     39835016|
| 3522158| SP|        Itaoca|     3332|2018|        22158|    35|     41298592|
| 1707405| TO|   Esperantina|    11280|2017|         7405|    17|     85646671|
+--------+---+--------------+---------+----+-------------+------+-------------+
only showing top 3 rows



In [23]:
DFjoin.printSchema()

root
 |-- cod_ibge: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- nome_municipio: string (nullable = true)
 |-- populacao: integer (nullable = true)
 |-- ano: integer (nullable = true)
 |-- cod_municipio: integer (nullable = true)
 |-- cod_uf: integer (nullable = true)
 |-- pib_municipio: long (nullable = true)



In [24]:
DFjoin.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Dados_ibge')

                                                                                

## --> 3. Tratamento de dados de qualidade

In [25]:
qualidade = loadData('qualidade')

In [26]:
qualidade.show(3)

[Stage 72:>                                                         (0 + 1) / 1]

+---------------+----------------+---------------+-------+----------------+-------------+---+----------------------------+---------+-----------------+-----------------+-------+----+------------------+----------+----------+-------+---+---+-----+---------+
|        Servi_o|Sigla_do_Servi_o|Tipo_de_Outorga|Empresa|    Raz_o_Social|         CNPJ| UF|_rea_de_C_lculo_do_Indicador|Indicador|Per_odo_de_Coleta|Meta_do_Indicador|M_s_Ano| Ano|Grupo_do_Indicador|Resultados|Descumpriu|Cumpriu| NO| NI|Total|Normativo|
+---------------+----------------+---------------+-------+----------------+-------------+---+----------------------------+---------+-----------------+-----------------+-------+----+------------------+----------+----------+-------+---+---+-----+---------+
|Telefonia Móvel|             SMP|    Autorização|    TIM|TIM CELULAR S.A.|4206050000180| SP|               Área do CN 18|    SMP12|           MENSAL|           >= 90%|      1|2018|       Atendimento|    53,28%|         1|      0|  0| 

                                                                                

In [39]:
qualidade.printSchema()

root
 |-- Servi_o: string (nullable = true)
 |-- Sigla_do_Servi_o: string (nullable = true)
 |-- Tipo_de_Outorga: string (nullable = true)
 |-- Empresa: string (nullable = true)
 |-- Raz_o_Social: string (nullable = true)
 |-- CNPJ: long (nullable = true)
 |-- UF: string (nullable = true)
 |-- _rea_de_C_lculo_do_Indicador: string (nullable = true)
 |-- Indicador: string (nullable = true)
 |-- Per_odo_de_Coleta: string (nullable = true)
 |-- Meta_do_Indicador: string (nullable = true)
 |-- M_s_Ano: integer (nullable = true)
 |-- Ano: integer (nullable = true)
 |-- Grupo_do_Indicador: string (nullable = true)
 |-- Resultados: string (nullable = true)
 |-- Descumpriu: integer (nullable = true)
 |-- Cumpriu: integer (nullable = true)
 |-- NO: integer (nullable = true)
 |-- NI: integer (nullable = true)
 |-- Total: integer (nullable = true)
 |-- Normativo: string (nullable = true)



In [41]:
dfqld = qualidade.select(col('Servi_o').alias('servico'),
                         col('Empresa').alias('empresa'),
                         col('Ano').alias('ano'),
                         col('UF').alias('uf'),
                         col('Meta_do_Indicador').alias('meta_indicador'),
                         col('Cumpriu').alias('cumpriu'),
                         col('Descumpriu').alias('descumpriu'))
dfqld.show(3)

[Stage 89:>                                                         (0 + 1) / 1]

+---------------+-------+----+---+--------------+-------+----------+
|        servico|empresa| ano| uf|meta_indicador|cumpriu|descumpriu|
+---------------+-------+----+---+--------------+-------+----------+
|Telefonia Móvel|    TIM|2018| SP|        >= 90%|      0|         1|
|Telefonia Móvel|    TIM|2018| SP|        >= 90%|      0|         1|
|Telefonia Móvel|    TIM|2018| RJ|        >= 90%|      0|         1|
+---------------+-------+----+---+--------------+-------+----------+
only showing top 3 rows



                                                                                

In [42]:
dfqld.printSchema()

root
 |-- servico: string (nullable = true)
 |-- empresa: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- meta_indicador: string (nullable = true)
 |-- cumpriu: integer (nullable = true)
 |-- descumpriu: integer (nullable = true)



In [43]:
dfqld.show(3)

[Stage 90:>                                                         (0 + 1) / 1]

+---------------+-------+----+---+--------------+-------+----------+
|        servico|empresa| ano| uf|meta_indicador|cumpriu|descumpriu|
+---------------+-------+----+---+--------------+-------+----------+
|Telefonia Móvel|    TIM|2018| SP|        >= 90%|      0|         1|
|Telefonia Móvel|    TIM|2018| SP|        >= 90%|      0|         1|
|Telefonia Móvel|    TIM|2018| RJ|        >= 90%|      0|         1|
+---------------+-------+----+---+--------------+-------+----------+
only showing top 3 rows



                                                                                

In [44]:
dfqld.select([count(when(isnan(c),c)).alias(c)for c in dfqld.columns]).show()

[Stage 91:>                                                         (0 + 1) / 1]

+-------+-------+---+---+--------------+-------+----------+
|servico|empresa|ano| uf|meta_indicador|cumpriu|descumpriu|
+-------+-------+---+---+--------------+-------+----------+
|      0|      0|  0|  0|             0|      0|         0|
+-------+-------+---+---+--------------+-------+----------+



                                                                                

In [45]:
dfqld.printSchema()

root
 |-- servico: string (nullable = true)
 |-- empresa: string (nullable = true)
 |-- ano: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- meta_indicador: string (nullable = true)
 |-- cumpriu: integer (nullable = true)
 |-- descumpriu: integer (nullable = true)



In [47]:
dfqld.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Qualidade')

                                                                                

## --> 4. Tratamento de dados de cobertura

In [48]:
cobertura_operadoras = loadData('cobertura_operadoras')

In [49]:
dfco = cobertura_operadoras.select(col('Ano').alias('ano'), 
                                    col('Operadora').alias('operadora'),
                                    col('Tecnologia').alias('tecnologia'),
                                    col('Tipo_Setor').alias('setor'),
                                    substring("C_digo_Munic_pio",0,2).alias('cod_uf').cast(IntegerType()),
                                    regexp_replace("C_digo_Munic_pio", r"(^.{1,2})" , "").alias('cod_municipio').cast(IntegerType()),
                                    col('UF').alias('uf'),
                                    col('Regi_o').alias('regiao'),
                                    col('Domic_lios').alias('domicilios'),
                                    col('Moradores').alias('moradores'),
                                    col('Percentual_Cobertura').alias('percentual_cobertura'))
dfco.show(3)

[Stage 95:>                                                         (0 + 1) / 1]

+----+---------+----------+-----+------+-------------+---+------+----------+---------+--------------------+
| ano|operadora|tecnologia|setor|cod_uf|cod_municipio| uf|regiao|domicilios|moradores|percentual_cobertura|
+----+---------+----------+-----+------+-------------+---+------+----------+---------+--------------------+
|2021|     VIVO|        3G|Rural|    43|         5835| RS|   Sul|        30|       92|    61,4077393188376|
|2021|     VIVO|        3G|Rural|    43|         5934| RS|   Sul|        47|      151|   0,456767597340707|
|2021|     VIVO|        4G|Rural|    43|         5934| RS|   Sul|        78|      248|5,91976401546683E-02|
+----+---------+----------+-----+------+-------------+---+------+----------+---------+--------------------+
only showing top 3 rows



                                                                                

In [50]:
dfco.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- operadora: string (nullable = true)
 |-- tecnologia: string (nullable = true)
 |-- setor: string (nullable = true)
 |-- cod_uf: integer (nullable = true)
 |-- cod_municipio: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- domicilios: integer (nullable = true)
 |-- moradores: integer (nullable = true)
 |-- percentual_cobertura: string (nullable = true)



In [51]:
dfco.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Cobertura')

                                                                                

## --> 5. Tratamento de dados de reclamacoes

In [55]:
reclamacoes = loadData('reclamacoes')
reclamacoes.printSchema()

root
 |-- DataExtracao: string (nullable = true)
 |-- SOLICITA__ES: integer (nullable = true)
 |-- Ano: integer (nullable = true)
 |-- M_s: integer (nullable = true)
 |-- AnoM_s: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- Cidade: string (nullable = true)
 |-- CO_MUNICIPIO: string (nullable = true)
 |-- CanalEntrada: string (nullable = true)
 |-- Condi__o: string (nullable = true)
 |-- TipoAtendimento: string (nullable = true)
 |-- Servi_o: string (nullable = true)
 |-- Marca: string (nullable = true)
 |-- Assunto: string (nullable = true)
 |-- Problema: string (nullable = true)



In [64]:
dfrecl = reclamacoes.select(col('Ano').alias('ano'),
                    col('M_s').alias('mes'),
                    col('UF').alias('uf'),
                    col('Cidade').alias('cidade'),
                    regexp_replace("CO_MUNICIPIO", r"(^.{1,2})" , "").alias('cod_municipio').cast(IntegerType()),
                    col('CanalEntrada').alias('canal'),
                    col('Marca').alias('marca'),
                    col('Assunto').alias('assunto')).filter((col('Ano')==2017) |
                          (col('Ano')==2018) |
                          (col('marca')=='TIM') |
                          (col('marca')=='OI'))


In [65]:
dfrecl.printSchema()

root
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- cod_municipio: integer (nullable = true)
 |-- canal: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- assunto: string (nullable = true)



In [66]:
dfrecl.count()

                                                                                

15952407

In [67]:
dfrecl.write.mode("overwrite").parquet('gs://parquetcassandranatalsoul/Reclamacao')

                                                                                